]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd/: switch all users of PGTransaction to use the new structure
authorSamuel Just <sjust@redhat.com>
Fri, 12 Aug 2016 15:42:12 +0000 (08:42 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 17 Nov 2016 18:40:17 +0000 (10:40 -0800)
This patch removes ReplicatedBackend::PGTransaction and implemenations
and switches over all users.  Happily, do_osd_ops loses the mod_desc
cruft and OpContext::pending_attrs.  PGTransaction doesn't really
have a natural way to implement append, however.  In reality, I think
this is probably an improvement, but it does mean that copy_from's
final transaction is now filled in by a lambda rather than by
appending a transaction fragment.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/ECTransaction.cc
src/osd/ECTransaction.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 013a508d9886d2e2d5a3210a487b3f5ea571007d..11b05a5e027e835392ff55f8100cf7917092b809 100644 (file)
@@ -1333,32 +1333,6 @@ void ECBackend::dump_recovery_info(Formatter *f) const
   f->close_section();
 }
 
-PGBackend::PGTransaction *ECBackend::get_transaction()
-{
-  return new ECTransaction;
-}
-
-struct MustPrependHashInfo : public ObjectModDesc::Visitor {
-  enum { EMPTY, FOUND_APPEND, FOUND_CREATE_STASH } state;
-  MustPrependHashInfo() : state(EMPTY) {}
-  void append(uint64_t) {
-    if (state == EMPTY) {
-      state = FOUND_APPEND;
-    }
-  }
-  void rmobject(version_t) {
-    if (state == EMPTY) {
-      state = FOUND_CREATE_STASH;
-    }
-  }
-  void create() {
-    if (state == EMPTY) {
-      state = FOUND_CREATE_STASH;
-    }
-  }
-  bool must_prepend_hash_info() const { return state == FOUND_APPEND; }
-};
-
 void ECBackend::submit_transaction(
   const hobject_t &hoid,
   const eversion_t &at_version,
@@ -1390,10 +1364,10 @@ void ECBackend::submit_transaction(
   op->reqid = reqid;
   op->client_op = client_op;
   
-  op->t.reset(static_cast<ECTransaction*>(_t.release()));
+  op->t = std::move(_t);
 
   set<hobject_t, hobject_t::BitwiseComparator> need_hinfos;
-  op->t->get_append_objects(&need_hinfos);
+  ECTransaction::get_append_objects(*(op->t), &need_hinfos);
   for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = need_hinfos.begin();
        i != need_hinfos.end();
        ++i) {
@@ -1411,27 +1385,6 @@ void ECBackend::submit_transaction(
        ref));
   }
 
-  for (vector<pg_log_entry_t>::iterator i = op->log_entries.begin();
-       i != op->log_entries.end();
-       ++i) {
-    MustPrependHashInfo vis;
-    i->mod_desc.visit(&vis);
-    if (vis.must_prepend_hash_info()) {
-      dout(10) << __func__ << ": stashing HashInfo for "
-              << i->soid << " for entry " << *i << dendl;
-      assert(op->unstable_hash_infos.count(i->soid));
-      ObjectModDesc desc;
-      map<string, boost::optional<bufferlist> > old_attrs;
-      bufferlist old_hinfo;
-      ::encode(*(op->unstable_hash_infos[i->soid]), old_hinfo);
-      old_attrs[ECUtil::get_hinfo_key()] = old_hinfo;
-      desc.setattrs(old_attrs);
-      i->mod_desc.swap(desc);
-      i->mod_desc.claim_append(desc);
-      assert(i->mod_desc.can_rollback());
-    }
-  }
-
   dout(10) << __func__ << ": op " << *op << " starting" << dendl;
   start_write(op);
   writing.push_back(op);
@@ -1798,11 +1751,13 @@ void ECBackend::start_write(Op *op) {
   }
   ObjectStore::Transaction empty;
 
-  op->t->generate_transactions(
+  ECTransaction::generate_transactions(
+    *(op->t),
     op->unstable_hash_infos,
     ec_impl,
     get_parent()->get_info().pgid.pgid,
     sinfo,
+    op->log_entries,
     &trans,
     &(op->temp_added),
     &(op->temp_cleared));
index 8b6535953ee955d70ce43efd19adb65d4fb015d8..0612431ac39be2f235885dc6515acf412a772a15 100644 (file)
@@ -26,7 +26,6 @@ struct ECSubWrite;
 struct ECSubWriteReply;
 struct ECSubRead;
 struct ECSubReadReply;
-class ECTransaction;
 
 struct RecoveryMessages;
 class ECBackend : public PGBackend {
@@ -89,9 +88,6 @@ public:
 
   void dump_recovery_info(Formatter *f) const;
 
-  /// @see osd/ECTransaction.cc/h
-  PGTransaction *get_transaction();
-
   void submit_transaction(
     const hobject_t &hoid,
     const eversion_t &at_version,
@@ -350,6 +346,7 @@ public:
     eversion_t trim_to;
     eversion_t trim_rollback_to;
     vector<pg_log_entry_t> log_entries;
+    map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
     boost::optional<pg_hit_set_history_t> updated_hit_set_history;
     Context *on_local_applied_sync;
     Context *on_all_applied;
@@ -358,7 +355,7 @@ public:
     osd_reqid_t reqid;
     OpRequestRef client_op;
 
-    std::unique_ptr<ECTransaction> t;
+    std::unique_ptr<PGTransaction> t;
 
     set<hobject_t, hobject_t::BitwiseComparator> temp_added;
     set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
index 0b5e4f4a5318af672ffaa1d10b93540d18505fa1..27657504b58cbe3674c0d875e37478922fde353e 100644 (file)
 
 #include <iostream>
 #include <vector>
+#include <vector>
 #include <sstream>
 
 #include "ECTransaction.h"
 #include "ECUtil.h"
 #include "os/ObjectStore.h"
+#include "common/inline_variant.h"
 
-struct AppendObjectsGenerator: public boost::static_visitor<void> {
-  set<hobject_t, hobject_t::BitwiseComparator> *out;
-  explicit AppendObjectsGenerator(set<hobject_t, hobject_t::BitwiseComparator> *out) : out(out) {}
-  void operator()(const ECTransaction::AppendOp &op) {
-    out->insert(op.oid);
-  }
-  void operator()(const ECTransaction::TouchOp &op) {
-    out->insert(op.oid);
-  }
-  void operator()(const ECTransaction::CloneOp &op) {
-    out->insert(op.source);
-    out->insert(op.target);
-  }
-  void operator()(const ECTransaction::RenameOp &op) {
-    out->insert(op.source);
-    out->insert(op.destination);
-  }
-  void operator()(const ECTransaction::StashOp &op) {
-    out->insert(op.oid);
-  }
-  void operator()(const ECTransaction::RemoveOp &op) {
-    out->insert(op.oid);
-  }
-  void operator()(const ECTransaction::SetAttrsOp &op) {}
-  void operator()(const ECTransaction::RmAttrOp &op) {}
-  void operator()(const ECTransaction::AllocHintOp &op) {}
-  void operator()(const ECTransaction::NoOp &op) {}
-};
 void ECTransaction::get_append_objects(
-  set<hobject_t, hobject_t::BitwiseComparator> *out) const
+  const PGTransaction &t,
+  set<hobject_t, hobject_t::BitwiseComparator> *out)
 {
-  AppendObjectsGenerator gen(out);
-  reverse_visit(gen);
+  for (auto &&i: t.op_map) {
+    out->insert(i.first);
+    hobject_t source;
+    if (i.second.has_source(&source))
+      out->insert(source);
+  }
 }
 
-struct TransGenerator : public boost::static_visitor<void> {
-  map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos;
+void append(
+  pg_t pgid,
+  const hobject_t &oid,
+  const ECUtil::stripe_info_t &sinfo,
+  ErasureCodeInterfaceRef &ecimpl,
+  const set<int> &want,
+  uint64_t offset,
+  bufferlist &bl,
+  uint32_t flags,
+  ECUtil::HashInfoRef hinfo,
+  map<shard_id_t, ObjectStore::Transaction> *transactions) {
 
-  ErasureCodeInterfaceRef &ecimpl;
-  const pg_t pgid;
-  const ECUtil::stripe_info_t sinfo;
-  map<shard_id_t, ObjectStore::Transaction> *trans;
-  set<int> want;
-  set<hobject_t, hobject_t::BitwiseComparator> *temp_added;
-  set<hobject_t, hobject_t::BitwiseComparator> *temp_removed;
-  stringstream *out;
-  TransGenerator(
-    map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
-    ErasureCodeInterfaceRef &ecimpl,
-    pg_t pgid,
-    const ECUtil::stripe_info_t &sinfo,
-    map<shard_id_t, ObjectStore::Transaction> *trans,
-    set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
-    set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
-    stringstream *out)
-    : hash_infos(hash_infos),
-      ecimpl(ecimpl), pgid(pgid),
-      sinfo(sinfo),
-      trans(trans),
-      temp_added(temp_added), temp_removed(temp_removed),
-      out(out) {
-    for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
-      want.insert(i);
-    }
-  }
+  assert(bl.length());
+  assert(offset % sinfo.get_stripe_width() == 0);
+  assert(
+    sinfo.aligned_logical_offset_to_chunk_offset(offset) ==
+    hinfo->get_total_chunk_size());
+  map<int, bufferlist> buffers;
 
-  coll_t get_coll_ct(shard_id_t shard, const hobject_t &hoid) {
-    if (hoid.is_temp()) {
-      temp_removed->erase(hoid);
-      temp_added->insert(hoid);
-    }
-    return get_coll(shard);
-  }
-  coll_t get_coll_rm(shard_id_t shard, const hobject_t &hoid) {
-    if (hoid.is_temp()) {
-      temp_added->erase(hoid);
-      temp_removed->insert(hoid);
-    }
-    return get_coll(shard);
-  }
-  coll_t get_coll(shard_id_t shard) {
-    return coll_t(spg_t(pgid, shard));
+  // align
+  if (bl.length() % sinfo.get_stripe_width())
+    bl.append_zero(
+      sinfo.get_stripe_width() -
+      ((offset + bl.length()) % sinfo.get_stripe_width()));
+  int r = ECUtil::encode(
+    sinfo, ecimpl, bl, want, &buffers);
+
+  hinfo->append(
+    sinfo.aligned_logical_offset_to_chunk_offset(offset),
+    buffers);
+  bufferlist hbuf;
+  ::encode(*hinfo, hbuf);
+
+  assert(r == 0);
+  for (auto &&i : *transactions) {
+    assert(buffers.count(i.first));
+    bufferlist &enc_bl = buffers[i.first];
+    i.second.set_alloc_hint(
+      coll_t(spg_t(pgid, i.first)),
+      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+      0, 0,
+      CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
+      CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+    i.second.write(
+      coll_t(spg_t(pgid, i.first)),
+      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+      sinfo.logical_to_prev_chunk_offset(
+       offset),
+      enc_bl.length(),
+      enc_bl,
+      flags);
+    i.second.setattr(
+      coll_t(spg_t(pgid, i.first)),
+      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+      ECUtil::get_hinfo_key(),
+      hbuf);
   }
+}
 
-  void operator()(const ECTransaction::TouchOp &op) {
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.touch(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first));
+void ECTransaction::generate_transactions(
+  PGTransaction &t,
+  map<
+    hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
+    > &hash_infos,
+  ErasureCodeInterfaceRef &ecimpl,
+  pg_t pgid,
+  const ECUtil::stripe_info_t &sinfo,
+  vector<pg_log_entry_t> &entries,
+  map<shard_id_t, ObjectStore::Transaction> *transactions,
+  set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
+  set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
+  stringstream *out)
+{
+  assert(transactions);
+  assert(temp_added);
+  assert(temp_removed);
 
-      /* No change, but write it out anyway in case the object did not
-       * previously exist. */
-      assert(hash_infos.count(op.oid));
-      ECUtil::HashInfoRef hinfo = hash_infos[op.oid];
-      bufferlist hbuf;
-      ::encode(
-       *hinfo,
-       hbuf);
-      i->second.setattr(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       ECUtil::get_hinfo_key(),
-       hbuf);
-    }
+  map<hobject_t, pg_log_entry_t*, hobject_t::BitwiseComparator> obj_to_log;
+  for (auto &&i: entries) {
+    obj_to_log.insert(make_pair(i.soid, &i));
   }
-  void operator()(const ECTransaction::AppendOp &op) {
-    uint64_t offset = op.off;
-    bufferlist bl(op.bl);
-    assert(bl.length());
-    assert(offset % sinfo.get_stripe_width() == 0);
-    map<int, bufferlist> buffers;
 
-    assert(hash_infos.count(op.oid));
-    ECUtil::HashInfoRef hinfo = hash_infos[op.oid];
+  t.safe_create_traverse(
+    [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
+    const hobject_t &oid = opair.first;
 
-    // align
-    if (bl.length() % sinfo.get_stripe_width())
-      bl.append_zero(
-       sinfo.get_stripe_width() -
-       ((offset + bl.length()) % sinfo.get_stripe_width()));
-    assert(bl.length() - op.bl.length() < sinfo.get_stripe_width());
-    int r = ECUtil::encode(
-      sinfo, ecimpl, bl, want, &buffers);
+    auto iter = obj_to_log.find(oid);
+    pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
 
-    hinfo->append(
-      sinfo.aligned_logical_offset_to_chunk_offset(op.off),
-      buffers);
-    bufferlist hbuf;
-    ::encode(
-      *hinfo,
-      hbuf);
+    if (entry && opair.second.updated_snaps) {
+      entry->mod_desc.update_snaps(opair.second.updated_snaps->first);
+      vector<snapid_t> snaps(
+       opair.second.updated_snaps->second.begin(),
+       opair.second.updated_snaps->second.end());
+      ::encode(snaps, entry->snaps);
+    }
 
-    assert(r == 0);
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      assert(buffers.count(i->first));
-      bufferlist &enc_bl = buffers[i->first];
-      i->second.set_alloc_hint(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       0, 0,
-       CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
-       CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
-      i->second.write(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       sinfo.logical_to_prev_chunk_offset(
-         offset),
-       enc_bl.length(),
-       enc_bl,
-       op.fadvise_flags);
-      i->second.setattr(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       ECUtil::get_hinfo_key(),
-       hbuf);
+    ObjectContextRef obc;
+    auto obiter = t.obc_map.find(oid);
+    if (obiter != t.obc_map.end()) {
+      obc = obiter->second;
     }
-  }
-  void operator()(const ECTransaction::CloneOp &op) {
-    assert(hash_infos.count(op.source));
-    assert(hash_infos.count(op.target));
-    *(hash_infos[op.target]) = *(hash_infos[op.source]);
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.clone(
-       get_coll_ct(i->first, op.source),
-       ghobject_t(op.source, ghobject_t::NO_GEN, i->first),
-       ghobject_t(op.target, ghobject_t::NO_GEN, i->first));
+    if (entry) {
+      assert(obc);
+    } else {
+      assert(oid.is_temp());
     }
-  }
-  void operator()(const ECTransaction::RenameOp &op) {
-    assert(hash_infos.count(op.source));
-    assert(hash_infos.count(op.destination));
-    *(hash_infos[op.destination]) = *(hash_infos[op.source]);
-    hash_infos[op.source]->clear();
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.collection_move_rename(
-       get_coll_rm(i->first, op.source),
-       ghobject_t(op.source, ghobject_t::NO_GEN, i->first),
-       get_coll_ct(i->first, op.destination),
-       ghobject_t(op.destination, ghobject_t::NO_GEN, i->first));
+
+    map<string, boost::optional<bufferlist> > xattr_rollback;
+    ECUtil::HashInfoRef hinfo;
+    {
+      auto iter = hash_infos.find(oid);
+      assert(iter != hash_infos.end());
+      hinfo = iter->second;
+      bufferlist old_hinfo;
+      ::encode(*hinfo, old_hinfo);
+      xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
     }
-  }
-  void operator()(const ECTransaction::StashOp &op) {
-    assert(hash_infos.count(op.oid));
-    hash_infos[op.oid]->clear();
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      coll_t cid(get_coll_rm(i->first, op.oid));
-      i->second.collection_move_rename(
-       cid,
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       cid,
-       ghobject_t(op.oid, op.version, i->first));
+
+    if (opair.second.is_none() && opair.second.truncate) {
+      assert(opair.second.truncate->first == 0);
+      assert(opair.second.truncate->first ==
+            opair.second.truncate->second);
+      assert(entry);
+      assert(obc);
+
+      opair.second.truncate = boost::none;
+      opair.second.delete_first = true;
+      opair.second.init_type = PGTransaction::ObjectOperation::Init::Create();
+
+      if (obc) {
+       /* We need to reapply all of the cached xattrs.
+        * std::map insert fortunately only writes keys
+        * which don't already exist, so this should do
+        * the right thing. */
+       opair.second.attr_updates.insert(
+         obc->attr_cache.begin(),
+         obc->attr_cache.end());
+      }
     }
-  }
-  void operator()(const ECTransaction::RemoveOp &op) {
-    assert(hash_infos.count(op.oid));
-    hash_infos[op.oid]->clear();
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.remove(
-       get_coll_rm(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first));
+
+    if (oid.is_temp()) {
+      if (opair.second.is_fresh_object()) {
+       temp_added->insert(oid);
+      } else if (opair.second.is_delete()) {
+       temp_removed->insert(oid);
+      }
     }
-  }
-  void operator()(const ECTransaction::SetAttrsOp &op) {
-    map<string, bufferlist> attrs(op.attrs);
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.setattrs(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       attrs);
+
+    if (opair.second.delete_first) {
+      /* We also want to remove the boost::none entries since
+       * the keys already won't exist */
+      for (auto j = opair.second.attr_updates.begin();
+          j != opair.second.attr_updates.end();
+       ) {
+       if (j->second) {
+         ++j;
+       } else {
+         opair.second.attr_updates.erase(j++);
+       }
+      }
+      /* Fill in all current entries for xattr rollback */
+      if (obc) {
+       xattr_rollback.insert(
+         obc->attr_cache.begin(),
+         obc->attr_cache.end());
+       obc->attr_cache.clear();
+      }
+      if (entry) {
+       entry->mod_desc.rmobject(entry->version.version);
+       for (auto &&st: *transactions) {
+         st.second.collection_move_rename(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, entry->version.version, st.first));
+       }
+      } else {
+       for (auto &&st: *transactions) {
+         st.second.remove(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+       }
+      }
+      hinfo->clear();
     }
-  }
-  void operator()(const ECTransaction::RmAttrOp &op) {
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-        i != trans->end();
-        ++i) {
-      i->second.rmattr(
-       get_coll_ct(i->first, op.oid),
-       ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-       op.key);
+
+    if (opair.second.is_fresh_object() && entry) {
+      entry->mod_desc.create();
     }
-  }
-  void operator()(const ECTransaction::AllocHintOp &op) {
-    // logical_to_next_chunk_offset() scales down both aligned and
-    // unaligned offsets
-    uint64_t object_size = sinfo.logical_to_next_chunk_offset(
-                                                    op.expected_object_size);
-    uint64_t write_size = sinfo.logical_to_next_chunk_offset(
-                                                   op.expected_write_size);
 
-    for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
-         i != trans->end();
-         ++i) {
-      i->second.set_alloc_hint(
-        get_coll_ct(i->first, op.oid),
-        ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
-        object_size, write_size, op.flags);
+    match(
+      opair.second.init_type,
+      [&](const PGTransaction::ObjectOperation::Init::None &) {},
+      [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+       for (auto &&st: *transactions) {
+         st.second.touch(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+       }
+      },
+      [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+       for (auto &&st: *transactions) {
+         st.second.clone(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+       }
+
+       auto siter = hash_infos.find(op.source);
+       assert(siter != hash_infos.end());
+       *hinfo = *(siter->second);
+
+       if (obc) {
+         auto cobciter = t.obc_map.find(op.source);
+         assert(cobciter != t.obc_map.end());
+         obc->attr_cache = cobciter->second->attr_cache;
+       }
+      },
+      [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+       assert(op.source.is_temp());
+       for (auto &&st: *transactions) {
+         st.second.collection_move_rename(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+       }
+       auto siter = hash_infos.find(op.source);
+       assert(siter != hash_infos.end());
+       *hinfo = *(siter->second);
+       assert(obc->attr_cache.empty());
+      });
+
+    // omap, truncate not supported (except 0, handled above)
+    assert(!(opair.second.clear_omap));
+    assert(!(opair.second.truncate));
+    assert(!(opair.second.omap_header));
+    assert(opair.second.omap_updates.empty());
+
+    if (!opair.second.attr_updates.empty()) {
+      map<string, bufferlist> to_set;
+      for (auto &&j: opair.second.attr_updates) {
+       if (j.second) {
+         to_set[j.first] = *(j.second);
+       } else {
+         for (auto &&st : *transactions) {
+           st.second.rmattr(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             j.first);
+         }
+       }
+       if (obc) {
+         auto citer = obc->attr_cache.find(j.first);
+         if (entry) {
+           if (citer != obc->attr_cache.end()) {
+             // won't overwrite anything we put in earlier
+             xattr_rollback.insert(
+               make_pair(
+                 j.first,
+                 boost::optional<bufferlist>(citer->second)));
+           } else {
+             // won't overwrite anything we put in earlier
+             xattr_rollback.insert(
+               make_pair(
+                 j.first,
+                 boost::none));
+           }
+         }
+         if (j.second) {
+           obc->attr_cache[j.first] = *(j.second);
+         } else if (citer != obc->attr_cache.end()) {
+           obc->attr_cache.erase(citer);
+         }
+       } else {
+         assert(!entry);
+       }
+      }
+      for (auto &&st : *transactions) {
+       st.second.setattrs(
+         coll_t(spg_t(pgid, st.first)),
+         ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+         to_set);
+      }
+      assert(!xattr_rollback.empty());
+    }
+    if (entry && !xattr_rollback.empty()) {
+      entry->mod_desc.setattrs(xattr_rollback);
     }
-  }
-  void operator()(const ECTransaction::NoOp &op) {}
-};
 
+    if (opair.second.alloc_hint) {
+      /* logical_to_next_chunk_offset() scales down both aligned and
+       * unaligned offsets
 
-void ECTransaction::generate_transactions(
-  map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
-  ErasureCodeInterfaceRef &ecimpl,
-  pg_t pgid,
-  const ECUtil::stripe_info_t &sinfo,
-  map<shard_id_t, ObjectStore::Transaction> *transactions,
-  set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
-  set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
-  stringstream *out) const
-{
-  TransGenerator gen(
-    hash_infos,
-    ecimpl,
-    pgid,
-    sinfo,
-    transactions,
-    temp_added,
-    temp_removed,
-    out);
-  visit(gen);
+       * we don't bother to roll this back at this time for two reasons:
+       * 1) it's advisory
+       * 2) we don't track the old value */
+      uint64_t object_size = sinfo.logical_to_next_chunk_offset(
+       opair.second.alloc_hint->expected_object_size);
+      uint64_t write_size = sinfo.logical_to_next_chunk_offset(
+       opair.second.alloc_hint->expected_write_size);
+
+      for (auto &&st : *transactions) {
+       st.second.set_alloc_hint(
+         coll_t(spg_t(pgid, st.first)),
+         ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+         object_size,
+         write_size,
+         opair.second.alloc_hint->flags);
+      }
+    }
+
+
+    if (!opair.second.buffer_updates.empty()) {
+      set<int> want;
+      for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
+       want.insert(i);
+      }
+      if (entry) {
+       entry->mod_desc.append(
+         sinfo.aligned_chunk_offset_to_logical_offset(
+           hinfo->get_total_chunk_size()
+           ));
+      }
+      for (auto &&extent: opair.second.buffer_updates) {
+       using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+       match(
+         extent.get_val(),
+         [&](const BufferUpdate::Write &op) {
+           if (extent.get_len()) {
+             assert(op.buffer.length() == extent.get_len());
+             bufferlist bl = op.buffer;
+             append(
+               pgid,
+               oid,
+               sinfo,
+               ecimpl,
+               want,
+               extent.get_off(),
+               bl,
+               op.fadvise_flags,
+               hinfo,
+               transactions);
+           }
+         },
+         [&](const BufferUpdate::Zero &) {
+           assert(
+             0 ==
+             "Zero is not allowed, do_op should have returned ENOTSUPP");
+         },
+         [&](const BufferUpdate::CloneRange &) {
+           assert(
+             0 ==
+             "CloneRange is not allowed, do_op should have returned ENOTSUPP");
+         });
+      }
+    }
+  });
 }
index f9129ea486bf814b9a71ca6766b8511cbf682d41..aa6bef5fabdf340a7e14a4657ef9580267d99d1c 100644 (file)
 #include "PGBackend.h"
 #include "ECUtil.h"
 #include "erasure-code/ErasureCodeInterface.h"
+#include "PGTransaction.h"
 
-class ECTransaction : public PGBackend::PGTransaction {
-public:
-  struct AppendOp {
-    hobject_t oid;
-    uint64_t off;
-    bufferlist bl;
-    uint32_t fadvise_flags;
-    AppendOp(const hobject_t &oid, uint64_t off, bufferlist &bl, uint32_t flags)
-      : oid(oid), off(off), bl(bl), fadvise_flags(flags) {}
-  };
-  struct CloneOp {
-    hobject_t source;
-    hobject_t target;
-    CloneOp(const hobject_t &source, const hobject_t &target)
-      : source(source), target(target) {}
-  };
-  struct RenameOp {
-    hobject_t source;
-    hobject_t destination;
-    RenameOp(const hobject_t &source, const hobject_t &destination)
-      : source(source), destination(destination) {}
-  };
-  struct StashOp {
-    hobject_t oid;
-    version_t version;
-    StashOp(const hobject_t &oid, version_t version)
-      : oid(oid), version(version) {}
-  };
-  struct TouchOp {
-    hobject_t oid;
-    explicit TouchOp(const hobject_t &oid) : oid(oid) {}
-  };
-  struct RemoveOp {
-    hobject_t oid;
-    explicit RemoveOp(const hobject_t &oid) : oid(oid) {}
-  };
-  struct SetAttrsOp {
-    hobject_t oid;
-    map<string, bufferlist> attrs;
-    SetAttrsOp(const hobject_t &oid, map<string, bufferlist> &_attrs)
-      : oid(oid) {
-      attrs.swap(_attrs);
-    }
-    SetAttrsOp(const hobject_t &oid, const string &key, bufferlist &val)
-      : oid(oid) {
-      attrs.insert(make_pair(key, val));
-    }
-  };
-  struct RmAttrOp {
-    hobject_t oid;
-    string key;
-    RmAttrOp(const hobject_t &oid, const string &key) : oid(oid), key(key) {}
-  };
-  struct AllocHintOp {
-    hobject_t oid;
-    uint64_t expected_object_size;
-    uint64_t expected_write_size;
-    uint32_t flags;
-    AllocHintOp(const hobject_t &oid,
-                uint64_t expected_object_size,
-                uint64_t expected_write_size,
-               uint32_t flags)
-      : oid(oid),
-       expected_object_size(expected_object_size),
-        expected_write_size(expected_write_size),
-       flags(flags) {}
-  };
-  struct NoOp {};
-  typedef boost::variant<
-    AppendOp,
-    CloneOp,
-    RenameOp,
-    StashOp,
-    TouchOp,
-    RemoveOp,
-    SetAttrsOp,
-    RmAttrOp,
-    AllocHintOp,
-    NoOp> Op;
-  list<Op> ops;
-  uint64_t written;
-
-  ECTransaction() : written(0) {}
-  /// Write
-  void touch(
-    const hobject_t &hoid) {
-    ops.push_back(TouchOp(hoid));
-  }
-  void append(
-    const hobject_t &hoid,
-    uint64_t off,
-    uint64_t len,
-    bufferlist &bl,
-    uint32_t fadvise_flags) {
-    if (len == 0) {
-      touch(hoid);
-      return;
-    }
-    written += len;
-    assert(len == bl.length());
-    ops.push_back(AppendOp(hoid, off, bl, fadvise_flags));
-  }
-  void stash(
-    const hobject_t &hoid,
-    version_t former_version) {
-    ops.push_back(StashOp(hoid, former_version));
-  }
-  void remove(
-    const hobject_t &hoid) {
-    ops.push_back(RemoveOp(hoid));
-  }
-  void setattrs(
-    const hobject_t &hoid,
-    map<string, bufferlist> &attrs) {
-    ops.push_back(SetAttrsOp(hoid, attrs));
-  }
-  void setattr(
-    const hobject_t &hoid,
-    const string &attrname,
-    bufferlist &bl) {
-    ops.push_back(SetAttrsOp(hoid, attrname, bl));
-  }
-  void rmattr(
-    const hobject_t &hoid,
-    const string &attrname) {
-    ops.push_back(RmAttrOp(hoid, attrname));
-  }
-  void clone(
-    const hobject_t &from,
-    const hobject_t &to) {
-    ops.push_back(CloneOp(from, to));
-  }
-  void rename(
-    const hobject_t &from,
-    const hobject_t &to) {
-    ops.push_back(RenameOp(from, to));
-  }
-  void set_alloc_hint(
-    const hobject_t &hoid,
-    uint64_t expected_object_size,
-    uint64_t expected_write_size,
-    uint32_t flags) {
-    ops.push_back(AllocHintOp(hoid, expected_object_size, expected_write_size,
-                             flags));
-  }
-
-  void append(PGTransaction *_to_append) {
-    ECTransaction *to_append = static_cast<ECTransaction*>(_to_append);
-    written += to_append->written;
-    to_append->written = 0;
-    ops.splice(ops.end(), to_append->ops,
-              to_append->ops.begin(), to_append->ops.end());
-  }
-  void nop() {
-    ops.push_back(NoOp());
-  }
-  bool empty() const {
-    return ops.empty();
-  }
-  uint64_t get_bytes_written() const {
-    return written;
-  }
-  template <typename T>
-  void visit(T &vis) const {
-    for (list<Op>::const_iterator i = ops.begin(); i != ops.end(); ++i) {
-      boost::apply_visitor(vis, *i);
-    }
-  }
-  template <typename T>
-  void reverse_visit(T &vis) const {
-    for (list<Op>::const_reverse_iterator i = ops.rbegin();
-        i != ops.rend();
-        ++i) {
-      boost::apply_visitor(vis, *i);
-    }
-  }
+namespace ECTransaction {
   void get_append_objects(
-     set<hobject_t, hobject_t::BitwiseComparator> *out) const;
+    const PGTransaction &t,
+    set<hobject_t, hobject_t::BitwiseComparator> *out);
   void generate_transactions(
-    map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
+    PGTransaction &t,
+    map<
+      hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
+      > &hash_infos,
     ErasureCodeInterfaceRef &ecimpl,
     pg_t pgid,
     const ECUtil::stripe_info_t &sinfo,
+    vector<pg_log_entry_t> &entries,
     map<shard_id_t, ObjectStore::Transaction> *transactions,
     set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
     set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
-    stringstream *out = 0) const;
+    stringstream *out = 0);
 };
 
 #endif
index 46c7d7ca66ec0db01df3f6f5c71a89de4616c794..6ff831ab273580c858c8559c0537dd3495565e9f 100644 (file)
@@ -24,6 +24,7 @@
 #include "os/ObjectStore.h"
 #include "common/LogClient.h"
 #include <string>
+#include "PGTransaction.h"
 
 namespace Scrub {
   class Store;
@@ -367,123 +368,6 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
 
    virtual ~PGBackend() {}
 
-   /**
-    * Client IO Interface
-    */
-   class PGTransaction {
-   public:
-     /// Write
-     virtual void touch(
-       const hobject_t &hoid  ///< [in] obj to touch
-       ) = 0;
-     virtual void stash(
-       const hobject_t &hoid,   ///< [in] obj to remove
-       version_t former_version ///< [in] former object version
-       ) = 0;
-     virtual void remove(
-       const hobject_t &hoid ///< [in] obj to remove
-       ) = 0;
-     virtual void setattrs(
-       const hobject_t &hoid,         ///< [in] object to write
-       map<string, bufferlist> &attrs ///< [in] attrs, may be cleared
-       ) = 0;
-     virtual void setattr(
-       const hobject_t &hoid,         ///< [in] object to write
-       const string &attrname,        ///< [in] attr to write
-       bufferlist &bl                 ///< [in] val to write, may be claimed
-       ) = 0;
-     virtual void rmattr(
-       const hobject_t &hoid,         ///< [in] object to write
-       const string &attrname         ///< [in] attr to remove
-       ) = 0;
-     virtual void clone(
-       const hobject_t &from,
-       const hobject_t &to
-       ) = 0;
-     virtual void rename(
-       const hobject_t &from,
-       const hobject_t &to
-       ) = 0;
-     virtual void set_alloc_hint(
-       const hobject_t &hoid,
-       uint64_t expected_object_size,
-       uint64_t expected_write_size,
-       uint32_t flags
-       ) = 0;
-
-     /// Optional, not supported on ec-pool
-     virtual void write(
-       const hobject_t &hoid, ///< [in] object to write
-       uint64_t off,          ///< [in] off at which to write
-       uint64_t len,          ///< [in] len to write from bl
-       bufferlist &bl,        ///< [in] bl to write will be claimed to len
-       uint32_t fadvise_flags = 0 ///< [in] fadvise hint
-       ) { assert(0); }
-     virtual void omap_setkeys(
-       const hobject_t &hoid,         ///< [in] object to write
-       map<string, bufferlist> &keys  ///< [in] omap keys, may be cleared
-       ) { assert(0); }
-     virtual void omap_setkeys(
-       const hobject_t &hoid,         ///< [in] object to write
-       bufferlist &keys_bl  ///< [in] omap keys, may be cleared
-       ) { assert(0); }
-     virtual void omap_rmkeys(
-       const hobject_t &hoid,         ///< [in] object to write
-       set<string> &keys              ///< [in] omap keys, may be cleared
-       ) { assert(0); }
-     virtual void omap_rmkeys(
-       const hobject_t &hoid,         ///< [in] object to write
-       bufferlist &keys_bl            ///< [in] omap keys, may be cleared
-       ) { assert(0); }
-     virtual void omap_clear(
-       const hobject_t &hoid          ///< [in] object to clear omap
-       ) { assert(0); }
-     virtual void omap_setheader(
-       const hobject_t &hoid,         ///< [in] object to write
-       bufferlist &header             ///< [in] header
-       ) { assert(0); }
-     virtual void clone_range(
-       const hobject_t &from,         ///< [in] from
-       const hobject_t &to,           ///< [in] to
-       uint64_t fromoff,              ///< [in] offset
-       uint64_t len,                  ///< [in] len
-       uint64_t tooff                 ///< [in] offset
-       ) { assert(0); }
-     virtual void truncate(
-       const hobject_t &hoid,
-       uint64_t off
-       ) { assert(0); }
-     virtual void zero(
-       const hobject_t &hoid,
-       uint64_t off,
-       uint64_t len
-       ) { assert(0); }
-
-     /// Supported on all backends
-
-     /// off must be the current object size
-     virtual void append(
-       const hobject_t &hoid, ///< [in] object to write
-       uint64_t off,          ///< [in] off at which to write
-       uint64_t len,          ///< [in] len to write from bl
-       bufferlist &bl,        ///< [in] bl to write will be claimed to len
-       uint32_t fadvise_flags ///< [in] fadvise hint
-       ) { write(hoid, off, len, bl, fadvise_flags); }
-
-     /// to_append *must* have come from the same PGBackend (same concrete type)
-     virtual void append(
-       PGTransaction *to_append ///< [in] trans to append, to_append is cleared
-       ) = 0;
-     virtual void nop() = 0;
-     virtual bool empty() const = 0;
-     virtual uint64_t get_bytes_written() const = 0;
-     virtual ~PGTransaction() {}
-   };
-   using PGTransactionUPtr = std::unique_ptr<PGTransaction>;
-
-   /// Get implementation specific empty transaction
-   virtual PGTransaction *get_transaction() = 0;
-
    /// execute implementation specific transaction
    virtual void submit_transaction(
      const hobject_t &hoid,               ///< [in] object
index 1fcc6532fa5499fb1f756c01655f5910b68487ca..c3a02b0cce1e51b224b0e6e73ccdd1f913da6ddd 100644 (file)
@@ -351,222 +351,6 @@ void ReplicatedBackend::objects_read_async(
       new AsyncReadCallback(r, on_complete)));
 }
 
-
-class RPGTransaction : public PGBackend::PGTransaction {
-  coll_t coll;
-  set<hobject_t, hobject_t::BitwiseComparator> temp_added;
-  set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
-  mutable ObjectStore::Transaction t;
-  uint64_t written;
-  const coll_t &get_coll_ct(const hobject_t &hoid) {
-    if (hoid.is_temp()) {
-      temp_cleared.erase(hoid);
-      temp_added.insert(hoid);
-    }
-    return get_coll(hoid);
-  }
-  const coll_t &get_coll_rm(const hobject_t &hoid) {
-    if (hoid.is_temp()) {
-      temp_added.erase(hoid);
-      temp_cleared.insert(hoid);
-    }
-    return get_coll(hoid);
-  }
-  const coll_t &get_coll(const hobject_t &hoid) {
-    return coll;
-  }
-public:
-  RPGTransaction(coll_t coll)
-    : coll(coll), written(0) {}
-
-  /// Yields ownership of contained transaction
-  ObjectStore::Transaction&& get_transaction() {
-    return std::move(t);
-  }
-  const set<hobject_t, hobject_t::BitwiseComparator> &get_temp_added() {
-    return temp_added;
-  }
-  const set<hobject_t, hobject_t::BitwiseComparator> &get_temp_cleared() {
-    return temp_cleared;
-  }
-
-  void write(
-    const hobject_t &hoid,
-    uint64_t off,
-    uint64_t len,
-    bufferlist &bl,
-    uint32_t fadvise_flags
-    ) {
-    written += len;
-    t.write(get_coll_ct(hoid), ghobject_t(hoid), off, len, bl, fadvise_flags);
-  }
-  void remove(
-    const hobject_t &hoid
-    ) {
-    t.remove(get_coll_rm(hoid), ghobject_t(hoid));
-  }
-  void stash(
-    const hobject_t &hoid,
-    version_t former_version) {
-    t.collection_move_rename(
-      coll, ghobject_t(hoid), coll,
-      ghobject_t(hoid, former_version, shard_id_t::NO_SHARD));
-  }
-  void setattrs(
-    const hobject_t &hoid,
-    map<string, bufferlist> &attrs
-    ) {
-    t.setattrs(get_coll(hoid), ghobject_t(hoid), attrs);
-  }
-  void setattr(
-    const hobject_t &hoid,
-    const string &attrname,
-    bufferlist &bl
-    ) {
-    t.setattr(get_coll(hoid), ghobject_t(hoid), attrname, bl);
-  }
-  void rmattr(
-    const hobject_t &hoid,
-    const string &attrname
-    ) {
-    t.rmattr(get_coll(hoid), ghobject_t(hoid), attrname);
-  }
-  void omap_setkeys(
-    const hobject_t &hoid,
-    map<string, bufferlist> &keys
-    ) {
-    for (map<string, bufferlist>::iterator p = keys.begin(); p != keys.end(); ++p)
-      written += p->first.length() + p->second.length();
-    return t.omap_setkeys(get_coll(hoid), ghobject_t(hoid), keys);
-  }
-  void omap_setkeys(
-    const hobject_t &hoid,
-    bufferlist &keys_bl
-    ) {
-    written += keys_bl.length();
-    return t.omap_setkeys(get_coll(hoid), ghobject_t(hoid), keys_bl);
-  }
-  void omap_rmkeys(
-    const hobject_t &hoid,
-    set<string> &keys
-    ) {
-    t.omap_rmkeys(get_coll(hoid), ghobject_t(hoid), keys);
-  }
-  void omap_rmkeys(
-    const hobject_t &hoid,
-    bufferlist &keys_bl
-    ) {
-    t.omap_rmkeys(get_coll(hoid), ghobject_t(hoid), keys_bl);
-  }
-  void omap_clear(
-    const hobject_t &hoid
-    ) {
-    t.omap_clear(get_coll(hoid), ghobject_t(hoid));
-  }
-  void omap_setheader(
-    const hobject_t &hoid,
-    bufferlist &header
-    ) {
-    written += header.length();
-    t.omap_setheader(get_coll(hoid), ghobject_t(hoid), header);
-  }
-  void clone_range(
-    const hobject_t &from,
-    const hobject_t &to,
-    uint64_t fromoff,
-    uint64_t len,
-    uint64_t tooff
-    ) {
-    assert(get_coll(from) == get_coll_ct(to)  && get_coll(from) == coll);
-    t.clone_range(coll, ghobject_t(from), ghobject_t(to), fromoff, len, tooff);
-  }
-  void clone(
-    const hobject_t &from,
-    const hobject_t &to
-    ) {
-    assert(get_coll(from) == get_coll_ct(to)  && get_coll(from) == coll);
-    t.clone(coll, ghobject_t(from), ghobject_t(to));
-  }
-  void rename(
-    const hobject_t &from,
-    const hobject_t &to
-    ) {
-    t.collection_move_rename(
-      get_coll_rm(from),
-      ghobject_t(from),
-      get_coll_ct(to),
-      ghobject_t(to));
-  }
-
-  void touch(
-    const hobject_t &hoid
-    ) {
-    t.touch(get_coll_ct(hoid), ghobject_t(hoid));
-  }
-
-  void truncate(
-    const hobject_t &hoid,
-    uint64_t off
-    ) {
-    t.truncate(get_coll(hoid), ghobject_t(hoid), off);
-  }
-  void zero(
-    const hobject_t &hoid,
-    uint64_t off,
-    uint64_t len
-    ) {
-    t.zero(get_coll(hoid), ghobject_t(hoid), off, len);
-  }
-
-  void set_alloc_hint(
-    const hobject_t &hoid,
-    uint64_t expected_object_size,
-    uint64_t expected_write_size,
-    uint32_t flags
-    ) {
-    t.set_alloc_hint(get_coll(hoid), ghobject_t(hoid), expected_object_size,
-                    expected_write_size, flags);
-  }
-
-  using PGBackend::PGTransaction::append;
-  void append(
-    PGTransaction *_to_append
-    ) {
-    RPGTransaction *to_append = dynamic_cast<RPGTransaction*>(_to_append);
-    assert(to_append);
-    written += to_append->written;
-    to_append->written = 0;
-    t.append((to_append->t));
-    for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_append->temp_added.begin();
-        i != to_append->temp_added.end();
-        ++i) {
-      temp_cleared.erase(*i);
-      temp_added.insert(*i);
-    }
-    for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_append->temp_cleared.begin();
-        i != to_append->temp_cleared.end();
-        ++i) {
-      temp_added.erase(*i);
-      temp_cleared.insert(*i);
-    }
-  }
-  void nop() {
-    t.nop();
-  }
-  bool empty() const {
-    return t.empty(); 
-  }
-  uint64_t get_bytes_written() const {
-    return written;
-  }
-  ~RPGTransaction() { }
-};
-
-PGBackend::PGTransaction *ReplicatedBackend::get_transaction()
-{
-  return new RPGTransaction(coll);
-}
-
 class C_OSD_OnOpCommit : public Context {
   ReplicatedBackend *pg;
   ReplicatedBackend::InProgressOp *op;
@@ -589,13 +373,158 @@ public:
   }
 };
 
+void generate_transaction(
+  PGTransactionUPtr &pgt,
+  const coll_t &coll,
+  vector<pg_log_entry_t> &log_entries,
+  ObjectStore::Transaction *t,
+  set<hobject_t, hobject_t::BitwiseComparator> *added,
+  set<hobject_t, hobject_t::BitwiseComparator> *removed)
+{
+  assert(t);
+  assert(added);
+  assert(removed);
+
+  for (auto &&le: log_entries) {
+    le.mod_desc.mark_unrollbackable();
+    auto oiter = pgt->op_map.find(le.soid);
+    if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
+      vector<snapid_t> snaps(
+       oiter->second.updated_snaps->second.begin(),
+       oiter->second.updated_snaps->second.end());
+      ::encode(snaps, le.snaps);
+    }
+  }
+
+  pgt->safe_create_traverse(
+    [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
+      const hobject_t &oid = obj_op.first;
+      const ghobject_t goid =
+       ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
+      const PGTransaction::ObjectOperation &op = obj_op.second;
+
+      if (oid.is_temp()) {
+       if (op.is_fresh_object()) {
+         added->insert(oid);
+       } else if (op.is_delete()) {
+         removed->insert(oid);
+       }
+      }
+
+      if (op.delete_first) {
+       t->remove(coll, goid);
+      }
+
+      match(
+       op.init_type,
+       [&](const PGTransaction::ObjectOperation::Init::None &) {
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+         t->touch(coll, goid);
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+         t->clone(
+           coll,
+           ghobject_t(
+             op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+           goid);
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+         assert(op.source.is_temp());
+         t->collection_move_rename(
+           coll,
+           ghobject_t(
+             op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+           coll,
+           goid);
+       });
+
+      if (op.truncate) {
+       t->truncate(coll, goid, op.truncate->first);
+       if (op.truncate->first != op.truncate->second)
+         t->truncate(coll, goid, op.truncate->second);
+      }
+
+      if (!op.attr_updates.empty()) {
+       map<string, bufferlist> attrs;
+       for (auto &&p: op.attr_updates) {
+         if (p.second)
+           attrs[p.first] = *(p.second);
+         else
+           t->rmattr(coll, goid, p.first);
+       }
+       t->setattrs(coll, goid, attrs);
+      }
+
+      if (op.clear_omap)
+       t->omap_clear(coll, goid);
+      if (op.omap_header)
+       t->omap_setheader(coll, goid, *(op.omap_header));
+
+      for (auto &&up: op.omap_updates) {
+       using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
+       switch (up.first) {
+       case UpdateType::Remove:
+         t->omap_rmkeys(coll, goid, up.second);
+         break;
+       case UpdateType::Insert:
+         t->omap_setkeys(coll, goid, up.second);
+         break;
+       }
+      }
+
+      // updated_snaps doesn't matter since we marked unrollbackable
+
+      if (op.alloc_hint) {
+       auto &hint = *(op.alloc_hint);
+       t->set_alloc_hint(
+         coll,
+         goid,
+         hint.expected_object_size,
+         hint.expected_write_size,
+         hint.flags);
+      }
+
+      for (auto &&extent: op.buffer_updates) {
+       using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+       match(
+         extent.get_val(),
+         [&](const BufferUpdate::Write &op) {
+           t->write(
+             coll,
+             goid,
+             extent.get_off(),
+             extent.get_len(),
+             op.buffer);
+         },
+         [&](const BufferUpdate::Zero &op) {
+           t->zero(
+             coll,
+             goid,
+             extent.get_off(),
+             extent.get_len());
+         },
+         [&](const BufferUpdate::CloneRange &op) {
+           assert(op.len == extent.get_len());
+           t->clone_range(
+             coll,
+             ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+             goid,
+             op.offset,
+             extent.get_len(),
+             extent.get_off());
+         });
+      }
+    });
+}
+
 void ReplicatedBackend::submit_transaction(
   const hobject_t &soid,
   const eversion_t &at_version,
   PGTransactionUPtr &&_t,
   const eversion_t &trim_to,
   const eversion_t &trim_rollback_to,
-  const vector<pg_log_entry_t> &log_entries,
+  const vector<pg_log_entry_t> &_log_entries,
   boost::optional<pg_hit_set_history_t> &hset_history,
   Context *on_local_applied_sync,
   Context *on_all_acked,
@@ -604,13 +533,19 @@ void ReplicatedBackend::submit_transaction(
   osd_reqid_t reqid,
   OpRequestRef orig_op)
 {
-  std::unique_ptr<RPGTransaction> t(
-    static_cast<RPGTransaction*>(_t.release()));
-  assert(t);
-  ObjectStore::Transaction op_t = t->get_transaction();
-
-  assert(t->get_temp_added().size() <= 1);
-  assert(t->get_temp_cleared().size() <= 1);
+  vector<pg_log_entry_t> log_entries(_log_entries);
+  ObjectStore::Transaction op_t;
+  PGTransactionUPtr t(std::move(_t));
+  set<hobject_t, hobject_t::BitwiseComparator> added, removed;
+  generate_transaction(
+    t,
+    coll,
+    log_entries,
+    &op_t,
+    &added,
+    &removed);
+  assert(added.size() <= 1);
+  assert(removed.size() <= 1);
 
   assert(!in_progress_ops.count(tid));
   InProgressOp &op = in_progress_ops.insert(
@@ -629,7 +564,6 @@ void ReplicatedBackend::submit_transaction(
     parent->get_actingbackfill_shards().begin(),
     parent->get_actingbackfill_shards().end());
 
-
   issue_op(
     soid,
     at_version,
@@ -637,18 +571,15 @@ void ReplicatedBackend::submit_transaction(
     reqid,
     trim_to,
     trim_rollback_to,
-    t->get_temp_added().empty() ? hobject_t() : *(t->get_temp_added().begin()),
-    t->get_temp_cleared().empty() ?
-      hobject_t() : *(t->get_temp_cleared().begin()),
+    added.size() ? *(added.begin()) : hobject_t(),
+    removed.size() ? *(removed.begin()) : hobject_t(),
     log_entries,
     hset_history,
     &op,
     op_t);
 
-  if (!(t->get_temp_added().empty())) {
-    add_temp_objs(t->get_temp_added());
-  }
-  clear_temp_objs(t->get_temp_cleared());
+  add_temp_objs(added);
+  clear_temp_objs(removed);
 
   parent->log_operation(
     log_entries,
index f7da28d60dbd663b2a6a756750ef6123555f6cf5..9ad29e4e3ed5cadec487e8d3bb8f22b5cfe1312e 100644 (file)
@@ -340,7 +340,6 @@ private:
   };
   map<ceph_tid_t, InProgressOp> in_progress_ops;
 public:
-  PGTransaction *get_transaction();
   friend class C_OSD_OnOpCommit;
   friend class C_OSD_OnOpApplied;
   void submit_transaction(
index a220919228e85c5b41ff331d1b2ae257add99f82..4ea736fb3ec38ef128b75749d0f0a443b32cb344 100644 (file)
@@ -2244,6 +2244,7 @@ void ReplicatedPG::do_op(OpRequestRef& op)
     close_op_ctx(ctx);
     return;
   }
+  dout(20) << __func__ << " obc " << *obc << dendl;
 
   if (r) {
     dout(20) << __func__ << " returned an error: " << r << dendl;
@@ -3054,7 +3055,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 
   // this method must be idempotent since we may call it several times
   // before we finally apply the resulting transaction.
-  ctx->op_t.reset(pgbackend->get_transaction());
+  ctx->op_t.reset(new PGTransaction);
 
   if (op->may_write() || op->may_cache()) {
     // snap
@@ -3623,7 +3624,7 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
 
   ctx->at_version = get_next_version();
 
-  PGBackend::PGTransaction *t = ctx->op_t.get();
+  PGTransaction *t = ctx->op_t.get();
  
   if (new_snaps.empty()) {
     // remove clone
@@ -3681,20 +3682,11 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
        ctx->mtime,
        0)
       );
-    if (pool.info.require_rollback()) {
-      set<snapid_t> snaps(
-       ctx->obc->obs.oi.snaps.begin(),
-       ctx->obc->obs.oi.snaps.end());
-      ctx->log.back().mod_desc.update_snaps(snaps);
-      if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
-       t->stash(coid, ctx->at_version.version);
-      } else {
-       t->remove(coid);
-      }
-    } else {
-      t->remove(coid);
-      ctx->log.back().mod_desc.mark_unrollbackable();
-    }
+    t->remove(coid);
+    t->update_snaps(
+      coid,
+      old_snaps,
+      new_snaps);
     ctx->at_version.version++;
   } else {
     // save adjusted snaps for this object
@@ -3706,7 +3698,7 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
     coi.version = ctx->at_version;
     bl.clear();
     ::encode(coi, bl, get_osdmap()->get_up_osd_features());
-    setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl);
+    t->setattr(coid, OI_ATTR, bl);
 
     ctx->log.push_back(
       pg_log_entry_t(
@@ -3719,19 +3711,11 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
        ctx->mtime,
        0)
       );
-    if (pool.info.require_rollback()) {
-      set<string> changing;
-      changing.insert(OI_ATTR);
-      ctx->obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
-      set<snapid_t> snaps(
-       ctx->obc->obs.oi.snaps.begin(),
-       ctx->obc->obs.oi.snaps.end());
-      ctx->log.back().mod_desc.update_snaps(old_snaps);
-    } else {
-      ctx->log.back().mod_desc.mark_unrollbackable();
-    }
-    
-    ::encode(coi.snaps, ctx->log.back().snaps);
+
+    t->update_snaps(
+      coid,
+      old_snaps,
+      new_snaps);
     ctx->at_version.version++;
   }
 
@@ -3754,16 +3738,7 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
 
     ctx->snapset_obc->obs.exists = false;
     
-    if (pool.info.require_rollback()) {
-      if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
-       t->stash(snapoid, ctx->at_version.version);
-      } else {
-       t->remove(snapoid);
-      }
-    } else {
-      t->remove(snapoid);
-      ctx->log.back().mod_desc.mark_unrollbackable();
-    }
+    t->remove(snapoid);
   } else {
     dout(10) << coid << " filtering snapset on " << snapoid << dendl;
     snapset.filter(pool.info);
@@ -3793,16 +3768,7 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
     bl.clear();
     ::encode(ctx->snapset_obc->obs.oi, bl, get_osdmap()->get_up_osd_features());
     attrs[OI_ATTR].claim(bl);
-    setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs);
-
-    if (pool.info.require_rollback()) {
-      set<string> changing;
-      changing.insert(OI_ATTR);
-      changing.insert(SS_ATTR);
-      ctx->snapset_obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
-    } else {
-      ctx->log.back().mod_desc.mark_unrollbackable();
-    }
+    t->setattrs(snapoid, attrs);
   }
 
   return ctx;
@@ -4305,21 +4271,22 @@ static string list_entries(const T& m) {
   return s;
 }
 
-bool ReplicatedPG::maybe_create_new_object(OpContext *ctx)
+void ReplicatedPG::maybe_create_new_object(
+  OpContext *ctx,
+  bool ignore_transaction)
 {
   ObjectState& obs = ctx->new_obs;
   if (!obs.exists) {
     ctx->delta_stats.num_objects++;
     obs.exists = true;
     obs.oi.new_object();
-    return true;
+    if (!ignore_transaction)
+      ctx->op_t->create(obs.oi.soid);
   } else if (obs.oi.is_whiteout()) {
     dout(10) << __func__ << " clearing whiteout on " << obs.oi.soid << dendl;
     ctx->new_obs.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
     --ctx->delta_stats.num_whiteouts;
-    return true;
   }
-  return false;
 }
 
 int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
@@ -4332,7 +4299,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
   bool first_read = true;
 
-  PGBackend::PGTransaction* t = ctx->op_t.get();
+  PGTransaction* t = ctx->op_t.get();
 
   dout(10) << "do_osd_op " << soid << " " << ops << dendl;
 
@@ -5158,10 +5125,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       ++ctx->num_write;
       {
        tracepoint(osd, do_osd_op_pre_setallochint, soid.oid.name.c_str(), soid.snap.val, op.alloc_hint.expected_object_size, op.alloc_hint.expected_write_size);
-       if (maybe_create_new_object(ctx)) {
-          ctx->mod_desc.create();
-          t->touch(soid);
-       }
+       maybe_create_new_object(ctx);
        oi.expected_object_size = op.alloc_hint.expected_object_size;
        oi.expected_write_size = op.alloc_hint.expected_write_size;
        oi.alloc_hint_flags = op.alloc_hint.flags;
@@ -5202,15 +5166,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            result = -EOPNOTSUPP;
            break;
          }
-         ctx->mod_desc.create();
-       } else if (op.extent.offset == oi.size) {
-         ctx->mod_desc.append(oi.size);
-       } else {
-         ctx->mod_desc.mark_unrollbackable();
-         if (pool.info.require_rollback()) {
-           result = -EOPNOTSUPP;
-           break;
-         }
+       } else if (op.extent.offset != oi.size && pool.info.require_rollback()) {
+         result = -EOPNOTSUPP;
+         break;
        }
 
         if (seq && (seq > op.extent.truncate_seq) &&
@@ -5246,13 +5204,21 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
        if (result < 0)
          break;
-       if (pool.info.require_rollback()) {
-         t->append(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
+
+       maybe_create_new_object(ctx);
+
+       if (op.extent.length == 0) {
+         if (op.extent.offset > oi.size) {
+           t->truncate(
+             soid, op.extent.offset);
+         } else {
+           t->nop(soid);
+         }
        } else {
-         t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
+         t->write(
+           soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
        }
 
-       maybe_create_new_object(ctx);
        if (op.extent.offset == 0 && op.extent.length >= oi.size)
          obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
        else if (op.extent.offset == oi.size && obs.oi.is_data_digest())
@@ -5281,40 +5247,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        if (pool.info.has_flag(pg_pool_t::FLAG_WRITE_FADVISE_DONTNEED))
          op.flags = op.flags | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
 
-       if (pool.info.require_rollback()) {
-         if (obs.exists) {
-           if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
-             t->stash(soid, ctx->at_version.version);
-           } else {
-             t->remove(soid);
-           }
-         }
-         ctx->mod_desc.create();
-         t->append(soid, 0, op.extent.length, osd_op.indata, op.flags);
-         if (obs.exists) {
-           map<string, bufferlist> to_set = ctx->obc->attr_cache;
-           map<string, boost::optional<bufferlist> > &overlay =
-             ctx->pending_attrs[ctx->obc];
-           for (map<string, boost::optional<bufferlist> >::iterator i =
-                  overlay.begin();
-                i != overlay.end();
-                ++i) {
-             if (i->second) {
-               to_set[i->first] = *(i->second);
-             } else {
-               to_set.erase(i->first);
-             }
-           }
-           t->setattrs(soid, to_set);
-         }
-       } else {
-         ctx->mod_desc.mark_unrollbackable();
+       maybe_create_new_object(ctx);
+       t->truncate(soid, 0);
+       if (op.extent.length) {
          t->write(soid, 0, op.extent.length, osd_op.indata, op.flags);
-         if (obs.exists && op.extent.length < oi.size) {
-           t->truncate(soid, op.extent.length);
-         }
        }
-       maybe_create_new_object(ctx);
        obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
 
        write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
@@ -5347,7 +5284,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        assert(op.extent.length);
        if (obs.exists && !oi.is_whiteout()) {
-         ctx->mod_desc.mark_unrollbackable();
          t->zero(soid, op.extent.offset, op.extent.length);
          interval_set<uint64_t> ch;
          ch.insert(op.extent.offset, op.extent.length);
@@ -5381,14 +5317,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            // category is no longer implemented.
          }
           if (result >= 0) {
-           bool is_whiteout = obs.exists && oi.is_whiteout();
-           if (maybe_create_new_object(ctx)) {
-             ctx->mod_desc.create();
-             t->touch(soid);
-           } else if (is_whiteout) {
-             // to change whiteout to non-whiteout, it need an op to update xattr
-             t->nop();
-           }
+           maybe_create_new_object(ctx);
+           t->nop(soid);
           }
        }
       }
@@ -5405,7 +5335,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
       ++ctx->num_write;
-      ctx->mod_desc.mark_unrollbackable();
       {
        // truncate
        if (!obs.exists || oi.is_whiteout()) {
@@ -5431,6 +5360,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          oi.truncate_size = op.extent.truncate_size;
        }
 
+       maybe_create_new_object(ctx);
        t->truncate(soid, op.extent.offset);
        if (oi.size > op.extent.offset) {
          interval_set<uint64_t> trim;
@@ -5457,7 +5387,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_CLONERANGE:
       tracepoint(osd, do_osd_op_pre_clonerange, soid.oid.name.c_str(), soid.snap.val, op.clonerange.offset, op.clonerange.length, op.clonerange.src_offset);
-      ctx->mod_desc.mark_unrollbackable();
       if (pool.info.require_rollback()) {
        result = -EOPNOTSUPP;
        break;
@@ -5465,9 +5394,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       ++ctx->num_read;
       ++ctx->num_write;
       {
-       if (maybe_create_new_object(ctx)) {
-         t->touch(obs.oi.soid);
-       }
+       maybe_create_new_object(ctx);
        if (op.clonerange.src_offset + op.clonerange.length > src_obc->obs.oi.size) {
          dout(10) << " clonerange source " << osd_op.soid << " "
                   << op.clonerange.src_offset << "~" << op.clonerange.length
@@ -5520,7 +5447,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          } else {
            dout(10) << " registered new watch " << w << " by " << entity << dendl;
            oi.watchers[make_pair(cookie, entity)] = w;
-           t->nop();  // make sure update the object_info on disk!
+           t->nop(soid);  // make sure update the object_info on disk!
          }
          bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
          ctx->watch_connects.push_back(make_pair(w, will_ping));
@@ -5554,7 +5481,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            dout(10) << " removed watch " << oi_iter->second << " by "
                     << entity << dendl;
             oi.watchers.erase(oi_iter);
-           t->nop();  // update oi on disk
+           t->nop(soid);  // update oi on disk
            ctx->watch_disconnects.push_back(
              watch_disconnect_t(cookie, entity, false));
          } else {
@@ -5632,30 +5559,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENAMETOOLONG;
          break;
        }
-       if (maybe_create_new_object(ctx)) {
-         ctx->mod_desc.create();
-         t->touch(soid);
-       }
+       maybe_create_new_object(ctx);
        string aname;
        bp.copy(op.xattr.name_len, aname);
        tracepoint(osd, do_osd_op_pre_setxattr, soid.oid.name.c_str(), soid.snap.val, aname.c_str());
        string name = "_" + aname;
-       if (pool.info.require_rollback()) {
-         map<string, boost::optional<bufferlist> > to_set;
-         bufferlist old;
-         int r = getattr_maybe_cache(ctx->obc, name, &old);
-         if (r == 0) {
-           to_set[name] = old;
-         } else {
-           to_set[name];
-         }
-         ctx->mod_desc.setattrs(to_set);
-       } else {
-         ctx->mod_desc.mark_unrollbackable();
-       }
        bufferlist bl;
        bp.copy(op.xattr.value_len, bl);
-       setattr_maybe_cache(ctx->obc, ctx, t, name, bl);
+       t->setattr(soid, name, bl);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -5671,20 +5582,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        string name = "_" + aname;
-       if (pool.info.require_rollback()) {
-         map<string, boost::optional<bufferlist> > to_set;
-         bufferlist old;
-         int r = getattr_maybe_cache(ctx->obc, name, &old);
-         if (r == 0) {
-           to_set[name] = old;
-         } else {
-           to_set[name];
-         }
-         ctx->mod_desc.setattrs(to_set);
-       } else {
-         ctx->mod_desc.mark_unrollbackable();
-       }
-       rmattr_maybe_cache(ctx->obc, ctx, t, name);
+       t->rmattr(soid, name);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -5710,7 +5608,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_STARTSYNC:
       tracepoint(osd, do_osd_op_pre_startsync, soid.oid.name.c_str(), soid.snap.val);
-      t->nop();
+      t->nop(soid);
       break;
 
 
@@ -6005,12 +5903,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        tracepoint(osd, do_osd_op_pre_omapsetvals, soid.oid.name.c_str(), soid.snap.val);
        break;
       }
-      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
-       if (maybe_create_new_object(ctx)) {
-         t->touch(soid);
-       }
+       maybe_create_new_object(ctx);
        bufferlist to_set_bl;
        try {
          decode_str_str_map_to_bl(bp, &to_set_bl);
@@ -6045,12 +5940,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = -EOPNOTSUPP;
        break;
       }
-      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
-       if (maybe_create_new_object(ctx)) {
-         t->touch(soid);
-       }
+       maybe_create_new_object(ctx);
        t->omap_setheader(soid, osd_op.indata);
        ctx->delta_stats.num_wr++;
       }
@@ -6064,7 +5956,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = -EOPNOTSUPP;
        break;
       }
-      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists || oi.is_whiteout()) {
@@ -6086,7 +5977,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        tracepoint(osd, do_osd_op_pre_omaprmkeys, soid.oid.name.c_str(), soid.snap.val);
        break;
       }
-      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists || oi.is_whiteout()) {
@@ -6266,23 +6156,12 @@ inline int ReplicatedPG::_delete_oid(OpContext *ctx, bool no_whiteout)
   ObjectState& obs = ctx->new_obs;
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
-  PGBackend::PGTransaction* t = ctx->op_t.get();
+  PGTransaction* t = ctx->op_t.get();
 
   if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout))
     return -ENOENT;
 
-  if (pool.info.require_rollback()) {
-    if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
-      t->stash(soid, ctx->at_version.version);
-    } else {
-      t->remove(soid);
-    }
-    map<string, bufferlist> new_attrs;
-    replace_cached_attrs(ctx, ctx->obc, new_attrs);
-  } else {
-    ctx->mod_desc.mark_unrollbackable();
-    t->remove(soid);
-  }
+  t->remove(soid);
 
   if (oi.size > 0) {
     interval_set<uint64_t> ch;
@@ -6316,7 +6195,7 @@ inline int ReplicatedPG::_delete_oid(OpContext *ctx, bool no_whiteout)
     dout(20) << __func__ << " setting whiteout on " << soid << dendl;
     oi.set_flag(object_info_t::FLAG_WHITEOUT);
     ctx->delta_stats.num_whiteouts++;
-    t->touch(soid);
+    t->create(soid);
     osd->logger->inc(l_osd_tier_whiteout);
     return 0;
   }
@@ -6343,7 +6222,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   ObjectState& obs = ctx->new_obs;
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
-  PGBackend::PGTransaction* t = ctx->op_t.get();
+  PGTransaction* t = ctx->op_t.get();
   snapid_t snapid = (uint64_t)op.snap.snapid;
   hobject_t missing_oid;
 
@@ -6422,24 +6301,12 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       dout(10) << "_rollback_to deleting " << soid.oid
               << " and rolling back to old snap" << dendl;
 
-      if (pool.info.require_rollback()) {
-       if (obs.exists) {
-         if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
-           t->stash(soid, ctx->at_version.version);
-         } else {
-           t->remove(soid);
-         }
-       }
-       replace_cached_attrs(ctx, ctx->obc, rollback_to->attr_cache);
-      } else {
-       if (obs.exists) {
-         ctx->mod_desc.mark_unrollbackable();
-         t->remove(soid);
-       }
+      if (obs.exists) {
+       t->remove(soid);
       }
-      ctx->mod_desc.create();
-      t->clone(rollback_to_sobject, soid);
+      t->clone(soid, rollback_to_sobject);
       snapset.head_exists = true;
+      t->add_obc(rollback_to);
 
       map<snapid_t, interval_set<uint64_t> >::iterator iter =
        snapset.clone_overlap.lower_bound(snapid);
@@ -6459,7 +6326,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       }
 
       // Adjust the cached objectcontext
-      maybe_create_new_object(ctx);
+      maybe_create_new_object(ctx, true);
       ctx->delta_stats.num_bytes -= obs.oi.size;
       ctx->delta_stats.num_bytes += rollback_to->obs.oi.size;
       obs.oi.size = rollback_to->obs.oi.size;
@@ -6488,7 +6355,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
 
 void ReplicatedPG::_make_clone(
   OpContext *ctx,
-  PGBackend::PGTransaction* t,
+  PGTransaction* t,
   ObjectContextRef obc,
   const hobject_t& head, const hobject_t& coid,
   object_info_t *poi)
@@ -6496,7 +6363,7 @@ void ReplicatedPG::_make_clone(
   bufferlist bv;
   ::encode(*poi, bv, get_osdmap()->get_up_osd_features());
 
-  t->clone(head, coid);
+  t->clone(coid, head);
   setattr_maybe_cache(obc, ctx, t, OI_ATTR, bv);
   rmattr_maybe_cache(obc, ctx, t, SS_ATTR);
 }
@@ -6599,11 +6466,7 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
     snap_oi->copy_user_bits(ctx->obs->oi);
     snap_oi->snaps = snaps;
 
-    // prepend transaction to op_t
-    PGBackend::PGTransaction *t = pgbackend->get_transaction();
-    _make_clone(ctx, t, ctx->clone_obc, soid, coid, snap_oi);
-    t->append(ctx->op_t.get());
-    ctx->op_t.reset(t);
+    _make_clone(ctx, ctx->op_t.get(), ctx->clone_obc, soid, coid, snap_oi);
     
     ctx->delta_stats.num_objects++;
     if (snap_oi->is_dirty()) {
@@ -6633,7 +6496,6 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
                                      ctx->obs->oi.user_version,
                                      osd_reqid_t(), ctx->new_obs.oi.mtime, 0));
     ::encode(snaps, ctx->log.back().snaps);
-    ctx->log.back().mod_desc.create();
 
     ctx->at_version.version++;
   }
@@ -6902,16 +6764,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
              ctx->at_version,
              ctx->snapset_obc->obs.oi.version,
              0, osd_reqid_t(), ctx->mtime, 0));
-         if (pool.info.require_rollback()) {
-           if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
-             ctx->op_t->stash(snapoid, ctx->at_version.version);
-           } else {
-             ctx->op_t->remove(snapoid);
-           }
-         } else {
-           ctx->op_t->remove(snapoid);
-           ctx->log.back().mod_desc.mark_unrollbackable();
-         }
+         ctx->op_t->remove(snapoid);
          dout(10) << " removing old " << snapoid << dendl;
 
          ctx->at_version.version++;
@@ -6950,11 +6803,6 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
       }
       assert(got);
       dout(20) << " got greedy write on snapset_obc " << *ctx->snapset_obc << dendl;
-      if (pool.info.require_rollback() && !ctx->snapset_obc->obs.exists) {
-       ctx->log.back().mod_desc.create();
-      } else if (!pool.info.require_rollback()) {
-       ctx->log.back().mod_desc.mark_unrollbackable();
-      }
       ctx->snapset_obc->obs.exists = true;
       ctx->snapset_obc->obs.oi.version = ctx->at_version;
       ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
@@ -6964,18 +6812,10 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
       map<string, bufferlist> attrs;
       bufferlist bv(sizeof(ctx->new_obs.oi));
       ::encode(ctx->snapset_obc->obs.oi, bv, get_osdmap()->get_up_osd_features());
-      ctx->op_t->touch(snapoid);
+      ctx->op_t->create(snapoid);
       attrs[OI_ATTR].claim(bv);
       attrs[SS_ATTR].claim(bss);
       setattrs_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t.get(), attrs);
-      if (pool.info.require_rollback()) {
-       map<string, boost::optional<bufferlist> > to_set;
-       to_set[SS_ATTR];
-       to_set[OI_ATTR];
-       ctx->log.back().mod_desc.setattrs(to_set);
-      } else {
-       ctx->log.back().mod_desc.mark_unrollbackable();
-      }
       ctx->at_version.version++;
     }
   }
@@ -7019,18 +6859,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
     } else {
       dout(10) << " no snapset (this is a clone)" << dendl;
     }
-    setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t.get(), attrs);
-
-    if (pool.info.require_rollback()) {
-      set<string> changing;
-      changing.insert(OI_ATTR);
-      if (!soid.is_snap())
-       changing.insert(SS_ATTR);
-      ctx->obc->fill_in_setattrs(changing, &(ctx->mod_desc));
-    } else {
-      // replicated pools are never rollbackable in this case
-      ctx->mod_desc.mark_unrollbackable();
-    }
+    ctx->op_t->setattrs(soid, attrs);
   } else {
     ctx->new_obs.oi = object_info_t(ctx->obc->obs.oi.soid);
   }
@@ -7054,7 +6883,6 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
     }
   }
 
-  ctx->log.back().mod_desc.claim(ctx->mod_desc);
   if (!ctx->extra_reqids.empty()) {
     dout(20) << __func__ << "  extra_reqids " << ctx->extra_reqids << dendl;
     ctx->log.back().extra_reqids.swap(ctx->extra_reqids);
@@ -7534,6 +7362,30 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 
   assert(cop->rval >= 0);
 
+  if (!cop->temp_cursor.data_complete) {
+    cop->results.data_digest = cop->data.crc32c(cop->results.data_digest);
+  }
+  if (pool.info.supports_omap() && !cop->temp_cursor.omap_complete) {
+    if (cop->omap_header.length()) {
+      cop->results.omap_digest =
+       cop->omap_header.crc32c(cop->results.omap_digest);
+    }
+    if (cop->omap_data.length()) {
+      bufferlist keys;
+      keys.substr_of(cop->omap_data, 4, cop->omap_data.length() - 4);
+      cop->results.omap_digest = keys.crc32c(cop->results.omap_digest);
+    }
+  }
+
+  if (!cop->temp_cursor.attr_complete) {
+    for (map<string,bufferlist>::iterator p = cop->attrs.begin();
+        p != cop->attrs.end();
+        ++p) {
+      cop->results.attrs[string("_") + p->first] = p->second;
+    }
+    cop->attrs.clear();
+  }
+
   if (!cop->cursor.is_complete()) {
     // write out what we have so far
     if (cop->temp_cursor.is_initial()) {
@@ -7554,9 +7406,6 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     return;
   }
 
-  cop->results.final_tx = pgbackend->get_transaction();
-  _build_finish_copy_transaction(cop, cop->results.final_tx);
-
   // verify digests?
   if (cop->results.is_data_digest() || cop->results.is_omap_digest()) {
     dout(20) << __func__ << std::hex
@@ -7601,6 +7450,21 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     goto out;
   }
 
+  cop->results.fill_in_final_tx = std::function<void(PGTransaction*)>(
+    [this, &cop /* avoid ref cycle */](PGTransaction *t) {
+      ObjectState& obs = cop->obc->obs;
+      if (cop->temp_cursor.is_initial()) {
+       // write directly to final object
+       cop->results.temp_oid = obs.oi.soid;
+       _write_copy_chunk(cop, t);
+      } else {
+       // finish writing to temp object, then move into place
+       _write_copy_chunk(cop, t);
+       t->rename(obs.oi.soid, cop->results.temp_oid);
+      }
+      t->setattrs(obs.oi.soid, cop->results.attrs);
+    });
+
   dout(20) << __func__ << " success; committing" << dendl;
 
  out:
@@ -7645,7 +7509,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
   kick_object_context_blocked(cobc);
 }
 
-void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
           << " " << cop->attrs.size() << " attrs"
@@ -7654,16 +7518,7 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
           << " " << cop->omap_data.length() << " omap data bytes"
           << dendl;
   if (!cop->temp_cursor.attr_complete) {
-    t->touch(cop->results.temp_oid);
-    for (map<string,bufferlist>::iterator p = cop->attrs.begin();
-        p != cop->attrs.end();
-        ++p) {
-      cop->results.attrs[string("_") + p->first] = p->second;
-      t->setattr(
-       cop->results.temp_oid,
-       string("_") + p->first, p->second);
-    }
-    cop->attrs.clear();
+    t->create(cop->results.temp_oid);
   }
   if (!cop->temp_cursor.data_complete) {
     assert(cop->data.length() + cop->temp_cursor.data_offset ==
@@ -7687,31 +7542,25 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
               cop->cursor.data_offset);
       }
     }
-    cop->results.data_digest = cop->data.crc32c(cop->results.data_digest);
-    t->append(
-      cop->results.temp_oid,
-      cop->temp_cursor.data_offset,
-      cop->data.length(),
-      cop->data,
-      cop->dest_obj_fadvise_flags);
+    if (cop->data.length()) {
+      t->write(
+       cop->results.temp_oid,
+       cop->temp_cursor.data_offset,
+       cop->data.length(),
+       cop->data,
+       cop->dest_obj_fadvise_flags);
+    }
     cop->data.clear();
   }
   if (pool.info.supports_omap()) {
     if (!cop->temp_cursor.omap_complete) {
       if (cop->omap_header.length()) {
-       cop->results.omap_digest =
-         cop->omap_header.crc32c(cop->results.omap_digest);
        t->omap_setheader(
          cop->results.temp_oid,
          cop->omap_header);
        cop->omap_header.clear();
       }
       if (cop->omap_data.length()) {
-       // don't checksum the key count prefix
-       bufferlist keys;
-       keys.substr_of(cop->omap_data, 4, cop->omap_data.length() - 4);
-       cop->results.omap_digest = keys.crc32c(cop->results.omap_digest);
-
        map<string,bufferlist> omap;
        bufferlist::iterator p = cop->omap_data.begin();
        ::decode(omap, p);
@@ -7726,54 +7575,22 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
   cop->temp_cursor = cop->cursor;
 }
 
-void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
-                                                  PGBackend::PGTransaction* t)
-{
-  ObjectState& obs = cop->obc->obs;
-  if (cop->temp_cursor.is_initial()) {
-    // write directly to final object
-    cop->results.temp_oid = obs.oi.soid;
-    _write_copy_chunk(cop, t);
-  } else {
-    // finish writing to temp object, then move into place
-    _write_copy_chunk(cop, t);
-    t->rename(cop->results.temp_oid, obs.oi.soid);
-  }
-}
-
 void ReplicatedPG::finish_copyfrom(OpContext *ctx)
 {
   dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
   ObjectState& obs = ctx->new_obs;
   CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
 
-  if (pool.info.require_rollback()) {
-    if (obs.exists) {
-      if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
-       ctx->op_t->stash(obs.oi.soid, ctx->at_version.version);
-      } else {
-       ctx->op_t->remove(obs.oi.soid);
-      }
-    }
-    ctx->mod_desc.create();
-    replace_cached_attrs(ctx, ctx->obc, cb->results->attrs);
+  if (obs.exists) {
+    ctx->op_t->remove(obs.oi.soid);
   } else {
-    if (obs.exists) {
-      ctx->op_t->remove(obs.oi.soid);
-    }
-    ctx->mod_desc.mark_unrollbackable();
-  }
-
-  if (!obs.exists) {
     ctx->delta_stats.num_objects++;
     obs.exists = true;
   }
   if (cb->is_temp_obj_used()) {
     ctx->discard_temp_oid = cb->results->temp_oid;
   }
-  ctx->op_t->append(cb->results->final_tx);
-  delete cb->results->final_tx;
-  cb->results->final_tx = NULL;
+  cb->results->fill_in_final_tx(ctx->op_t.get());
 
   // CopyFromCallback fills this in for us
   obs.oi.user_version = ctx->user_at_version;
@@ -7942,7 +7759,7 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
 
   if (whiteout) {
     // create a whiteout
-    tctx->op_t->touch(soid);
+    tctx->op_t->create(soid);
     tctx->new_obs.oi.set_flag(object_info_t::FLAG_WHITEOUT);
     ++tctx->delta_stats.num_whiteouts;
     dout(20) << __func__ << " creating whiteout on " << soid << dendl;
@@ -7954,9 +7771,7 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
       ++tctx->delta_stats.num_objects_omap;
     }
 
-    tctx->op_t->append(results->final_tx);
-    delete results->final_tx;
-    results->final_tx = NULL;
+    results->fill_in_final_tx(tctx->op_t.get());
     if (results->started_temp_obj) {
       tctx->discard_temp_oid = results->temp_oid;
     }
@@ -8719,25 +8534,18 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
   }
 
   ctx->obc->ondisk_write_lock();
-  if (ctx->clone_obc)
-    ctx->clone_obc->ondisk_write_lock();
 
   bool unlock_snapset_obc = false;
+  ctx->op_t->add_obc(ctx->obc);
+  if (ctx->clone_obc) {
+    ctx->clone_obc->ondisk_write_lock();
+    ctx->op_t->add_obc(ctx->clone_obc);
+  }
   if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid !=
       ctx->obc->obs.oi.soid) {
     ctx->snapset_obc->ondisk_write_lock();
     unlock_snapset_obc = true;
-  }
-
-  ctx->apply_pending_attrs();
-
-  if (pool.info.require_rollback()) {
-    for (vector<pg_log_entry_t>::iterator i = ctx->log.begin();
-        i != ctx->log.end();
-        ++i) {
-      assert(i->mod_desc.can_rollback());
-      assert(!i->mod_desc.empty());
-    }
+    ctx->op_t->add_obc(ctx->snapset_obc);
   }
 
   Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
@@ -8828,7 +8636,7 @@ ReplicatedPG::OpContextUPtr ReplicatedPG::simple_opc_create(ObjectContextRef obc
   ceph_tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
   OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
-  ctx->op_t.reset(pgbackend->get_transaction());
+  ctx->op_t.reset(new PGTransaction());
   ctx->mtime = ceph_clock_now(g_ceph_context);
   return ctx;
 }
@@ -9111,7 +8919,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
     });
 
 
-  PGBackend::PGTransaction *t = ctx->op_t.get();
+  PGTransaction *t = ctx->op_t.get();
   ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
                                    ctx->at_version,
                                    oi.version,
@@ -9122,16 +8930,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   oi.version = ctx->at_version;
   bufferlist bl;
   ::encode(oi, bl, get_osdmap()->get_up_osd_features());
-  setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl);
-
-  if (pool.info.require_rollback()) {
-    map<string, boost::optional<bufferlist> > to_set;
-    to_set[OI_ATTR] = bl;
-    ctx->log.back().mod_desc.setattrs(to_set);
-  } else {
-    ctx->log.back().mod_desc.mark_unrollbackable();
-  }
-
+  t->setattr(obc->obs.oi.soid, OI_ATTR, bl);
 
   // apply new object state.
   ctx->obc->obs = ctx->new_obs;
@@ -11916,7 +11715,10 @@ void ReplicatedPG::hit_set_persist()
   bufferlist boi(sizeof(ctx->new_obs.oi));
   ::encode(ctx->new_obs.oi, boi, get_osdmap()->get_up_osd_features());
 
-  ctx->op_t->append(oid, 0, bl.length(), bl, 0);
+  ctx->op_t->create(oid);
+  if (bl.length()) {
+    ctx->op_t->write(oid, 0, bl.length(), bl, 0);
+  }
   map <string, bufferlist> attrs;
   attrs[OI_ATTR].claim(boi);
   attrs[SS_ATTR].claim(bss);
@@ -11932,11 +11734,6 @@ void ReplicatedPG::hit_set_persist()
       ctx->mtime,
       0)
     );
-  if (pool.info.require_rollback()) {
-    ctx->log.back().mod_desc.create();
-  } else {
-    ctx->log.back().mod_desc.mark_unrollbackable();
-  }
 
   hit_set_trim(ctx, max);
 
@@ -11967,17 +11764,8 @@ void ReplicatedPG::hit_set_trim(OpContextUPtr &ctx, unsigned max)
                       osd_reqid_t(),
                       ctx->mtime,
                       0));
-    if (pool.info.require_rollback()) {
-      if (ctx->log.back().mod_desc.rmobject(
-         ctx->at_version.version)) {
-       ctx->op_t->stash(oid, ctx->at_version.version);
-      } else {
-       ctx->op_t->remove(oid);
-      }
-    } else {
-      ctx->op_t->remove(oid);
-      ctx->log.back().mod_desc.mark_unrollbackable();
-    }
+
+    ctx->op_t->remove(oid);
     updated_hit_set_hist.history.pop_front();
 
     ObjectContextRef obc = get_object_context(oid, false);
@@ -13376,61 +13164,31 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&
   return transit< NotTrimming >();
 }
 
-void ReplicatedPG::replace_cached_attrs(
-  OpContext *ctx,
-  ObjectContextRef obc,
-  const map<string, bufferlist> &new_attrs)
-{
-  ctx->pending_attrs[obc].clear();
-  for (map<string, bufferlist>::iterator i = obc->attr_cache.begin();
-       i != obc->attr_cache.end();
-       ++i) {
-    ctx->pending_attrs[obc][i->first] = boost::optional<bufferlist>();
-  }
-  for (map<string, bufferlist>::const_iterator i = new_attrs.begin();
-       i != new_attrs.end();
-       ++i) {
-    ctx->pending_attrs[obc][i->first] = i->second;
-  }
-}
-
 void ReplicatedPG::setattr_maybe_cache(
   ObjectContextRef obc,
   OpContext *op,
-  PGBackend::PGTransaction *t,
+  PGTransaction *t,
   const string &key,
   bufferlist &val)
 {
-  if (pool.info.require_rollback()) {
-    op->pending_attrs[obc][key] = val;
-  }
   t->setattr(obc->obs.oi.soid, key, val);
 }
 
 void ReplicatedPG::setattrs_maybe_cache(
   ObjectContextRef obc,
   OpContext *op,
-  PGBackend::PGTransaction *t,
+  PGTransaction *t,
   map<string, bufferlist> &attrs)
 {
-  if (pool.info.require_rollback()) {
-    for (map<string, bufferlist>::iterator it = attrs.begin();
-      it != attrs.end(); ++it) {
-      op->pending_attrs[obc][it->first] = it->second;
-    }
-  }
   t->setattrs(obc->obs.oi.soid, attrs);
 }
 
 void ReplicatedPG::rmattr_maybe_cache(
   ObjectContextRef obc,
   OpContext *op,
-  PGBackend::PGTransaction *t,
+  PGTransaction *t,
   const string &key)
 {
-  if (pool.info.require_rollback()) {
-    op->pending_attrs[obc][key] = boost::optional<bufferlist>();
-  }
   t->rmattr(obc->obs.oi.soid, key);
 }
 
index e02c4668a1a4cd9d1b4d394da9206d92d4e1587a..47e3535deb3bb45753a80f9b5bfb863c1dd1d053 100644 (file)
@@ -25,6 +25,7 @@
 #include "messages/MOSDOpReply.h"
 #include "common/sharedptr_registry.hpp"
 #include "ReplicatedBackend.h"
+#include "PGTransaction.h"
 
 class MOSDSubOpReply;
 
@@ -74,23 +75,26 @@ public:
     uint64_t object_size; ///< the copied object's size
     bool started_temp_obj; ///< true if the callback needs to delete temp object
     hobject_t temp_oid;    ///< temp object (if any)
+
     /**
-     * Final transaction; if non-empty the callback must execute it before any
-     * other accesses to the object (in order to complete the copy).
+     * Function to fill in transaction; if non-empty the callback
+     * must execute it before any other accesses to the object
+     * (in order to complete the copy).
      */
-    PGBackend::PGTransaction *final_tx;
+    std::function<void(PGTransaction *)> fill_in_final_tx;
+
     version_t user_version; ///< The copy source's user version
     bool should_requeue;  ///< op should be requeued on cancel
     vector<snapid_t> snaps;  ///< src's snaps (if clone)
     snapid_t snap_seq;       ///< src's snap_seq (if head)
     librados::snap_set_t snapset; ///< src snapset (if head)
     bool mirror_snapset;
-    map<string, bufferlist> attrs; ///< src user attrs
     bool has_omap;
     uint32_t flags;    // object_copy_data_t::FLAG_*
     uint32_t source_data_digest, source_omap_digest;
     uint32_t data_digest, omap_digest;
     vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+    map<string, bufferlist> attrs; // xattrs
     uint64_t truncate_seq;
     uint64_t truncate_size;
     bool is_data_digest() {
@@ -101,7 +105,7 @@ public:
     }
     CopyResults()
       : object_size(0), started_temp_obj(false),
-       final_tx(NULL), user_version(0),
+       user_version(0),
        should_requeue(false), mirror_snapset(false),
        has_omap(false),
        flags(0),
@@ -479,7 +483,7 @@ public:
 
     int current_osd_subop_num;
 
-    PGBackend::PGTransactionUPtr op_t;
+    PGTransactionUPtr op_t;
     vector<pg_log_entry_t> log;
     boost::optional<pg_hit_set_history_t> updated_hset_history;
 
@@ -505,10 +509,6 @@ public:
 
     hobject_t new_temp_oid, discard_temp_oid;  ///< temp objects we should start/stop tracking
 
-    // pending xattr updates
-    map<ObjectContextRef,
-       map<string, boost::optional<bufferlist> > > pending_attrs;
-
     list<std::function<void()>> on_applied;
     list<std::function<void()>> on_committed;
     list<std::function<void()>> on_finish;
@@ -533,29 +533,6 @@ public:
     bool sent_ack;
     bool sent_disk;
 
-    void apply_pending_attrs() {
-      for (map<ObjectContextRef,
-            map<string, boost::optional<bufferlist> > >::iterator i =
-            pending_attrs.begin();
-          i != pending_attrs.end();
-          ++i) {
-       if (i->first->obs.exists) {
-         for (map<string, boost::optional<bufferlist> >::iterator j =
-                i->second.begin();
-              j != i->second.end();
-              ++j) {
-           if (j->second)
-             i->first->attr_cache[j->first] = j->second.get();
-           else
-             i->first->attr_cache.erase(j->first);
-         }
-       } else {
-         i->first->attr_cache.clear();
-       }
-      }
-      pending_attrs.clear();
-    }
-
     // pending async reads <off, len, op_flags> -> <outbl, outr>
     list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
              pair<bufferlist*, Context*> > > pending_async_reads;
@@ -568,8 +545,6 @@ public:
       return inflightreads == 0;
     }
 
-    ObjectModDesc mod_desc;
-
     ObjectContext::RWState::State lock_type;
     ObcLockManager lock_manager;
 
@@ -1102,7 +1077,7 @@ protected:
 
   void _make_clone(
     OpContext *ctx,
-    PGBackend::PGTransaction* t,
+    PGTransaction* t,
     ObjectContextRef obc,
     const hobject_t& head, const hobject_t& coid,
     object_info_t *poi);
@@ -1273,7 +1248,7 @@ protected:
                  bool mirror_snapset, unsigned src_obj_fadvise_flags,
                  unsigned dest_obj_fadvise_flags);
   void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
-  void _write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t);
+  void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
   uint64_t get_copy_chunk_size() const {
     uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
     if (pool.info.requires_aligned_append()) {
@@ -1285,8 +1260,6 @@ protected:
     return size;
   }
   void _copy_some(ObjectContextRef obc, CopyOpRef cop);
-  void _build_finish_copy_transaction(CopyOpRef cop,
-                                      PGBackend::PGTransaction *t);
   void finish_copyfrom(OpContext *ctx);
   void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
   void cancel_copy(CopyOpRef cop, bool requeue);
@@ -1502,7 +1475,7 @@ private:
                             const SnapSet& ss);
   // return true if we're creating a local object, false for a
   // whiteout or no change.
-  bool maybe_create_new_object(OpContext *ctx);
+  void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
   int _delete_oid(OpContext *ctx, bool no_whiteout);
   int _rollback_to(OpContext *ctx, ceph_osd_op& op);
 public:
@@ -1550,25 +1523,21 @@ public:
   void on_shutdown() override;
 
   // attr cache handling
-  void replace_cached_attrs(
-    OpContext *ctx,
-    ObjectContextRef obc,
-    const map<string, bufferlist> &new_attrs);
   void setattr_maybe_cache(
     ObjectContextRef obc,
     OpContext *op,
-    PGBackend::PGTransaction *t,
+    PGTransaction *t,
     const string &key,
     bufferlist &val);
   void setattrs_maybe_cache(
     ObjectContextRef obc,
     OpContext *op,
-    PGBackend::PGTransaction *t,
+    PGTransaction *t,
     map<string, bufferlist> &attrs);
   void rmattr_maybe_cache(
     ObjectContextRef obc,
     OpContext *op,
-    PGBackend::PGTransaction *t,
+    PGTransaction *t,
     const string &key);
   int getattr_maybe_cache(
     ObjectContextRef obc,