]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: break apart write stages, transactions
authorSage Weil <sage@newdream.net>
Sat, 23 May 2009 01:17:46 +0000 (18:17 -0700)
committerSage Weil <sage@newdream.net>
Mon, 25 May 2009 20:18:21 +0000 (13:18 -0700)
We break the write preparation into three stages.  First we run the ops
vector and build the op_t transaction.  If it is non-empty, we build a
clone_t transaction to run before it, and a local_t that updates the osd's
PG log and metadata.

Take care to preserve old exists, size, and version values before running
the ops vector as those are clobbered but need to be send to the replica
osds.

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 4631f9f9db2776462540fe5709bc1639551ac1c8..6a0fc96782fb7a8a64176e747e0eec81375bc65d 100644 (file)
@@ -894,7 +894,7 @@ void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
   t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv);
 }
 
-void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& stats,
+void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& stats,
                                 sobject_t soid, loff_t old_size, object_info_t& oi,
                                 eversion_t& at_version, SnapContext& snapc)
 {
@@ -944,9 +944,8 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl,
     dout(10) << "cloning v " << oi.version
             << " to " << coid << " v " << at_version
             << " snaps=" << snaps << dendl;
-    Log::Entry cloneentry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime);
-    ::encode(snaps, cloneentry.snaps);
-    add_log_entry(cloneentry, logbl);
+    log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime));
+    ::encode(snaps, log.back().snaps);
 
     at_version.version++;
   }
@@ -1268,7 +1267,6 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
 
 void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
 {
-  bufferlist log_bl;
   eversion_t log_version = ctx->at_version;
   assert(!ctx->ops.empty());
   
@@ -1278,22 +1276,23 @@ void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size
   vector<ceph_osd_op>& ops = ctx->ops;
   object_info_t *poi = ctx->poi;
 
-  // apply ops
-  bool did_snap = false;
+
+  // prepare the actual mutation
   bufferlist::iterator bp = ctx->data.begin();
-  for (unsigned i=0; i<ops.size(); i++) {
-    // clone?
-    if (!did_snap && soid.snap &&
-       !ceph_osd_op_type_lock(ops[i].op)) {     // is a (non-lock) modification
-      prepare_clone(ctx->t, log_bl, ctx->reqid, info.stats, soid, size, *poi,
-                   ctx->at_version, ctx->snapc);
-      did_snap = true;
-    }
-    prepare_simple_op(ctx->t, ctx->reqid, info.stats, soid, size, exists, *poi,
+  for (unsigned i=0; i<ops.size(); i++)
+    prepare_simple_op(ctx->op_t, ctx->reqid, info.stats, soid, size, exists, *poi,
                      ops, i, bp, ctx->snapc);
-  }
 
-  // finish.
+  // FIXME FIXME
+  if (ctx->op_t.empty())
+    return;
+
+  // clone?
+  if (soid.snap)
+    prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi,
+                 ctx->at_version, ctx->snapc);
+
+  // finish and log the op.
   poi->version = ctx->at_version;
   if (exists) {
     poi->version = ctx->at_version;
@@ -1308,19 +1307,24 @@ void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size
 
     bufferlist bv(sizeof(*poi));
     ::encode(*poi, bv);
-    ctx->t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
+    ctx->op_t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
   }
 
   // append to log
   int logopcode = Log::Entry::MODIFY;
   if (!exists)
     logopcode = Log::Entry::DELETE;
-  Log::Entry logentry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime);
-  add_log_entry(logentry, log_bl);
+  ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime));
+
+  // update the local pg, pg log
+  write_info(ctx->local_t);
 
-  // write pg info, log to disk
-  write_info(ctx->t);
-  append_log(ctx->t, log_bl, log_version, trim_to);
+  bufferlist log_bl;
+  for (vector<Log::Entry>::iterator p = ctx->log.begin();
+       p != ctx->log.end();
+       p++)
+    add_log_entry(*p, log_bl);
+  append_log(ctx->local_t, log_bl, log_version, trim_to);
 }
 
 
@@ -1377,12 +1381,19 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   assert(!repop->applied);
 
   Context *oncommit = new C_OSD_ModifyCommit(this, repop);
-  unsigned r = osd->store->apply_transaction(repop->ctx->t, oncommit);
+
+  list<ObjectStore::Transaction*> tls;
+  tls.push_back(&repop->ctx->clone_t);
+  tls.push_back(&repop->ctx->op_t);
+  tls.push_back(&repop->ctx->local_t);
+  unsigned r = osd->store->apply_transactions(tls, oncommit);
   if (r)
     dout(-10) << "apply_repop  apply transaction return " << r << " on " << *repop << dendl;
   
   // discard my reference to the buffer
   repop->ctx->op->get_data().clear();
+  tls.clear();
+  repop->ctx->op_t.clear_data();
   
   repop->applied = true;
   
@@ -1506,7 +1517,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 }
 
 
-void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
+void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now,
+                              bool old_exists, __u64 old_size, eversion_t old_version)
 {
   const sobject_t& soid = repop->ctx->poi->soid;
   dout(7) << " issue_repop rep_tid " << repop->rep_tid
@@ -1521,9 +1533,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
                                osd->osdmap->get_epoch(), 
                                repop->rep_tid, repop->ctx->at_version);
   wr->mtime = repop->ctx->mtime;
-  wr->old_exists = repop->obc->exists;
-  wr->old_size = repop->obc->size;
-  wr->old_version = repop->obc->oi.version;
+  wr->old_exists = old_exists;
+  wr->old_size = old_size;
+  wr->old_version = old_version;
   wr->snapset = repop->obc->oi.snapset;
   wr->snapc = repop->ctx->snapc;
   wr->get_data() = repop->ctx->op->get_data();   // _copy_ bufferlist
@@ -1657,7 +1669,7 @@ int ReplicatedPG::find_object_context(object_t oid, snapid_t snapid,
  
   // head?
   if (snapid > hobc->oi.snapset.seq) {
-    dout(10) << "get_object_context  " << head
+    dout(10) << "find_object_context  " << head
             << " want " << snapid << " > snapset seq " << hobc->oi.snapset.seq
             << " -- HIT" << dendl;
     *pobc = hobc;
@@ -1850,6 +1862,11 @@ void ReplicatedPG::op_modify(MOSDOp *op, ObjectContext *obc)
   if (log.top.version - log.bottom.version > info.stats.num_objects)
     trim_to = peers_complete_thru;
 
+  // note some basic context for op replication that prepare_transaction may clobber
+  bool old_exists = obc->exists;
+  __u64 old_size = obc->size;
+  eversion_t old_version = obc->oi.version;
+
   // we are acker.
   if (!noop) {
     // log and update later.
@@ -1857,7 +1874,7 @@ void ReplicatedPG::op_modify(MOSDOp *op, ObjectContext *obc)
   }
 
   for (unsigned i=1; i<acting.size(); i++)
-    issue_repop(repop, acting[i], now);
+    issue_repop(repop, acting[i], now, old_exists, old_size, old_version);
   
   // keep peer_info up to date
   for (unsigned i=1; i<acting.size(); i++) {
index 1da5309fccadf898192a14fc65bd9cf687f44b42..d97ba4ce0e2e3248d5be5be0ff5a0a6214937170 100644 (file)
@@ -224,7 +224,9 @@ public:
     utime_t mtime;
     SnapContext snapc;           // writer snap context
     eversion_t at_version;       // pg's current version pointer
-    ObjectStore::Transaction t;
+
+    ObjectStore::Transaction op_t, clone_t, local_t;
+    vector<PG::Log::Entry> log;
 
     int data_off;        // FIXME: we may want to kill this msgr hint off at some point!
 
@@ -314,7 +316,8 @@ protected:
 
   void apply_repop(RepGather *repop);
   void eval_repop(RepGather*);
-  void issue_repop(RepGather *repop, int dest, utime_t now);
+  void issue_repop(RepGather *repop, int dest, utime_t now,
+                  bool old_exists, __u64 old_size, eversion_t old_version);
   RepGather *new_repop(OpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid);
   void repop_ack(RepGather *repop,
                  int result, int ack_type,
@@ -363,7 +366,7 @@ protected:
   void _make_clone(ObjectStore::Transaction& t,
                   sobject_t head, sobject_t coid,
                   eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps);
-  void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& st,
+  void prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& st,
                     sobject_t poid, loff_t old_size, object_info_t& oi,
                     eversion_t& at_version, SnapContext& snapc);
   void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);