]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add WriteOpContext; some refactoring
authorSage Weil <sage@newdream.net>
Wed, 20 May 2009 16:52:49 +0000 (09:52 -0700)
committerSage Weil <sage@newdream.net>
Wed, 20 May 2009 16:52:49 +0000 (09:52 -0700)
Push as much write state into the WriteOpContext as possible; use
RepGather only for addition items necessary for the op replication.

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index edf9357937a7c2bd7fa94a5557c47080c5f1711b..a1327f60f9f641c650158e24ad3b65241d2bf856 100644 (file)
@@ -1320,62 +1320,61 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
   return 0;
 }
 
-void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
-                                      sobject_t soid,
-                                      vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
-                                      bool& exists, __u64& size, object_info_t& oi,
-                                      eversion_t at_version, SnapContext& snapc,
-                                      eversion_t trim_to)
+void ReplicatedPG::prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
 {
   bufferlist log_bl;
-  eversion_t log_version = at_version;
-  assert(!ops.empty());
+  eversion_t log_version = ctx->at_version;
+  assert(!ctx->ops.empty());
   
-  eversion_t old_version = oi.version;
+  eversion_t old_version = ctx->poi->version;
+
+  sobject_t& soid = ctx->soid;
+  vector<ceph_osd_op>& ops = ctx->ops;
+  object_info_t *poi = ctx->poi;
 
   // apply ops
   bool did_snap = false;
-  bufferlist::iterator bp = bl.begin();
+  bufferlist::iterator bp = ctx->data.begin();
   for (unsigned i=0; i<ops.size(); i++) {
     // clone?
     if (!did_snap && soid.snap &&
        !ceph_osd_op_type_lock(ops[i].op)) {     // is a (non-lock) modification
-      prepare_clone(t, log_bl, reqid, info.stats, soid, size, oi,
-                   at_version, snapc);
+      prepare_clone(ctx->t, log_bl, ctx->reqid, info.stats, soid, size, *poi,
+                   ctx->at_version, ctx->snapc);
       did_snap = true;
     }
-    prepare_simple_op(t, reqid, info.stats, soid, size, exists, oi,
-                     ops, i, bp, snapc);
+    prepare_simple_op(ctx->t, ctx->reqid, info.stats, soid, size, exists, *poi,
+                     ops, i, bp, ctx->snapc);
   }
 
   // finish.
-  oi.version = at_version;
+  poi->version = ctx->at_version;
   if (exists) {
-    oi.version = at_version;
-    oi.prior_version = old_version;
-    oi.last_reqid = reqid;
-    if (mtime != utime_t()) {
-      oi.mtime = mtime;
-      dout(10) << " set mtime to " << oi.mtime << dendl;
+    poi->version = ctx->at_version;
+    poi->prior_version = old_version;
+    poi->last_reqid = ctx->reqid;
+    if (ctx->mtime != utime_t()) {
+      poi->mtime = ctx->mtime;
+      dout(10) << " set mtime to " << poi->mtime << dendl;
     } else {
-      dout(10) << " mtime unchanged at " << oi.mtime << dendl;
+      dout(10) << " mtime unchanged at " << poi->mtime << dendl;
     }
 
-    bufferlist bv(sizeof(oi));
-    ::encode(oi, bv);
-    t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
+    bufferlist bv(sizeof(*poi));
+    ::encode(*poi, bv);
+    ctx->t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
   }
 
   // append to log
   int logopcode = Log::Entry::MODIFY;
   if (!exists)
     logopcode = Log::Entry::DELETE;
-  Log::Entry logentry(logopcode, soid, at_version, old_version, reqid, mtime);
+  Log::Entry logentry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime);
   add_log_entry(logentry, log_bl);
 
   // write pg info, log to disk
-  write_info(t);
-  append_log(t, log_bl, log_version, trim_to);
+  write_info(ctx->t);
+  append_log(ctx->t, log_bl, log_version, trim_to);
 }
 
 
@@ -1432,22 +1431,23 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   assert(!repop->applied);
 
   Context *oncommit = new C_OSD_ModifyCommit(this, repop);
-  unsigned r = osd->store->apply_transaction(repop->t, oncommit);
+  unsigned r = osd->store->apply_transaction(repop->ctx->t, oncommit);
   if (r)
     dout(-10) << "apply_repop  apply transaction return " << r << " on " << *repop << dendl;
   
   // discard my reference to the buffer
-  repop->op->get_data().clear();
+  repop->ctx->op->get_data().clear();
   
   repop->applied = true;
   
   put_projected_object(repop->pinfo);
+  repop->pinfo = 0;
 
   update_stats();
 
   // any completion stuff to do here?
-  sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
-  ceph_osd_op& first = repop->op->ops[0];
+  sobject_t& soid = repop->ctx->soid;
+  ceph_osd_op& first = repop->ctx->ops[0];
 
   switch (first.op) { 
 #if 0
@@ -1486,24 +1486,26 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 {
   dout(10) << "eval_repop " << *repop << dendl;
   
+  MOSDOp *op = (MOSDOp *)repop->ctx->op;
+
   // disk?
   if (repop->can_send_disk()) {
-    if (repop->op->wants_ondisk()) {
+    if (op->wants_ondisk()) {
       // send commit.
-      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+      MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
       dout(10) << " sending commit on " << *repop << " " << reply << dendl;
-      osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+      osd->messenger->send_message(reply, op->get_orig_source_inst());
       repop->sent_disk = true;
     }
   }
 
   // nvram?
   else if (repop->can_send_nvram()) {
-    if (repop->op->wants_onnvram()) {
+    if (op->wants_onnvram()) {
       // send commit.
-      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONNVRAM);
+      MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONNVRAM);
       dout(10) << " sending onnvram on " << *repop << " " << reply << dendl;
-      osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+      osd->messenger->send_message(reply, op->get_orig_source_inst());
       repop->sent_nvram = true;
     }
   }
@@ -1514,11 +1516,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     if (!repop->applied)
       apply_repop(repop);
 
-    if (repop->op->wants_ack()) {
+    if (op->wants_ack()) {
       // send ack
-      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
+      MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
       dout(10) << " sending ack on " << *repop << " " << reply << dendl;
-      osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+      osd->messenger->send_message(reply, op->get_orig_source_inst());
       repop->sent_ack = true;
     }
 
@@ -1558,7 +1560,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
 void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
 {
-  sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
+  sobject_t& soid = repop->ctx->soid;
   dout(7) << " issue_repop rep_tid " << repop->rep_tid
           << " o " << soid
           << " to osd" << dest
@@ -1566,35 +1568,29 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
   
   // forward the write/update/whatever
   int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
-  MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, soid,
-                               repop->op->ops, repop->noop, acks_wanted,
+  MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
+                               repop->ctx->ops, repop->noop, acks_wanted,
                                osd->osdmap->get_epoch(), 
-                               repop->rep_tid, repop->at_version);
-  wr->mtime = repop->mtime;
+                               repop->rep_tid, repop->ctx->at_version);
+  wr->mtime = repop->ctx->mtime;
   wr->old_exists = repop->pinfo->exists;
   wr->old_size = repop->pinfo->size;
   wr->old_version = repop->pinfo->oi.version;
   wr->snapset = repop->pinfo->oi.snapset;
-  wr->snapc = repop->snapc;
-  wr->get_data() = repop->op->get_data();   // _copy_ bufferlist
+  wr->snapc = repop->ctx->snapc;
+  wr->get_data() = repop->ctx->op->get_data();   // _copy_ bufferlist
   if (osd->osdmap->get_pg_size(info.pgid) == acting.size())
     wr->pg_trim_to = peers_complete_thru;
   wr->peer_stat = osd->get_my_stat_for(now, dest);
   osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, bool noop,
-                                                tid_t rep_tid, 
-                                                ProjectedObjectInfo *pinfo,
-                                                eversion_t nv,
-                                                SnapContext& snapc)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo,
+                                                bool noop, tid_t rep_tid)
 {
-  dout(10) << "new_repop rep_tid " << rep_tid << " on " << *op << dendl;
+  dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
 
-  RepGather *repop = new RepGather(op, noop, rep_tid, 
-                                  pinfo,
-                                  nv, info.last_complete,
-                                  snapc);
+  RepGather *repop = new RepGather(ctx, pinfo, noop, rep_tid, info.last_complete);
 
   // initialize gather sets
   for (unsigned i=0; i<acting.size(); i++) {
@@ -1616,7 +1612,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, bool noop,
 void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
                             int fromosd, eversion_t pg_complete_thru)
 {
-  MOSDOp *op = repop->op;
+  MOSDOp *op = (MOSDOp *)repop->ctx->op;
 
   dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
           << " result " << result
@@ -1707,7 +1703,11 @@ void ReplicatedPG::put_projected_object(ProjectedObjectInfo *pinfo)
 void ReplicatedPG::op_modify(MOSDOp *op)
 {
   int whoami = osd->get_nodeid();
-  sobject_t soid(op->get_oid(), CEPH_NOSNAP);
+  
+  WriteOpContext *ctx = new WriteOpContext(op, op->ops, op->get_data(),
+                                          sobject_t(op->get_oid(), CEPH_NOSNAP),
+                                          op->get_reqid(), op->get_mtime());
+  sobject_t& soid = ctx->soid;
 
   // balance-reads set?
 #if 0
@@ -1735,66 +1735,71 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     // add to wait queue
     dout(-10) << "preprocess_op waiting for unbalance-reads on " << soid.oid << dendl;
     waiting_for_unbalanced_reads[soid.oid].push_back(op);
+    delete ctx;
     return;
   }
 #endif
 
   // get existing object info
   ProjectedObjectInfo *pinfo = get_projected_object(soid);
+  ctx->poi = &pinfo->oi;
 
   // --- locking ---
 
   // wrlock?
-  if (!op->ops.empty() &&  // except noop; we just want to flush
+  if (!ctx->ops.empty() &&  // except noop; we just want to flush
       block_if_wrlocked(op, pinfo->oi)) {
     put_projected_object(pinfo);
+    delete ctx;
     return; // op will be handled later, after the object unlocks
   }
 
   // dup op?
   bool noop = false;
   const char *opname;
-  if (op->ops.empty()) {
+  if (ctx->ops.empty()) {
     opname = "no-op";
     noop = true;
-  } else if (is_dup(op->get_reqid())) {
-    dout(3) << "op_modify " << op->ops << " dup op " << op->get_reqid()
+  } else if (is_dup(ctx->reqid)) {
+    dout(3) << "op_modify " << ctx->ops << " dup op " << ctx->reqid
              << ", doing WRNOOP" << dendl;
     opname = "no-op";
     noop = true;
   } else
-    opname = ceph_osd_op_name(op->ops[0].op);
+    opname = ceph_osd_op_name(ctx->ops[0].op);
 
 
   // version
-  eversion_t at_version = log.top;
+  ctx->at_version = log.top;
   if (!noop) {
-    at_version.epoch = osd->osdmap->get_epoch();
-    at_version.version++;
-    assert(at_version > info.last_update);
-    assert(at_version > log.top);
+    ctx->at_version.epoch = osd->osdmap->get_epoch();
+    ctx->at_version.version++;
+    assert(ctx->at_version > info.last_update);
+    assert(ctx->at_version > log.top);
   }
 
   // snap
-  SnapContext snapc;
-  snapc.seq = op->get_snap_seq();
-  snapc.snaps = op->get_snaps();
+  ctx->snapc.seq = op->get_snap_seq();
+  ctx->snapc.snaps = op->get_snaps();
 
   // set version in op, for benefit of client and our eventual reply
-  op->set_version(at_version);
+  op->set_version(ctx->at_version);
 
   dout(10) << "op_modify " << opname 
            << " " << soid
-           << " ov " << pinfo->oi.version << " av " << at_version 
-          << " snapc " << snapc
+           << " ov " << pinfo->oi.version << " av " << ctx->at_version 
+          << " snapc " << ctx->snapc
           << " snapset " << pinfo->oi.snapset
            << dendl;  
 
   // verify snap ordering
   if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
-      snapc.seq < pinfo->oi.snapset.seq) {
-    dout(10) << " ORDERSNAP flag set and snapc seq " << snapc.seq << " < snapset seq " << pinfo->oi.snapset.seq
+      ctx->snapc.seq < pinfo->oi.snapset.seq) {
+    dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
+            << " < snapset seq " << pinfo->oi.snapset.seq
             << " on " << soid << dendl;
+    put_projected_object(pinfo);
+    delete ctx;
     osd->reply_op_error(op, -EOLDSNAPC);
     return;
   }
@@ -1811,9 +1816,9 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     }
   }
 
-  if (op->get_data().length()) {
+  if (ctx->data.length()) {
     osd->logger->inc(l_osd_c_wr);
-    osd->logger->inc(l_osd_c_wrb, op->get_data().length());
+    osd->logger->inc(l_osd_c_wrb, ctx->data.length());
   }
 
   // note my stats
@@ -1821,11 +1826,11 @@ void ReplicatedPG::op_modify(MOSDOp *op)
 
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
-  RepGather *repop = new_repop(op, noop, rep_tid, pinfo, at_version, snapc);
+  RepGather *repop = new_repop(ctx, pinfo, noop, rep_tid);
   for (unsigned i=1; i<acting.size(); i++)
     issue_repop(repop, acting[i], now);
                                                                
-  eversion_t old_last_update = at_version;
+  eversion_t old_last_update = ctx->at_version;
        
   // trim log?
   eversion_t trim_to = is_clean() ? peers_complete_thru : eversion_t();
@@ -1835,18 +1840,15 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   // we are acker.
   if (!noop) {
     // log and update later.
-    prepare_transaction(repop->t, op->get_reqid(), soid, op->ops, op->get_data(), repop->mtime,
-                       pinfo->exists, pinfo->size, pinfo->oi,
-                       at_version, snapc,
-                       trim_to);
+    prepare_transaction(ctx, pinfo->exists, pinfo->size, trim_to);
   }
   
   // keep peer_info up to date
   for (unsigned i=1; i<acting.size(); i++) {
     Info &in = peer_info[acting[i]];
-    in.last_update = at_version;
+    in.last_update = ctx->at_version;
     if (in.last_complete == old_last_update)
-      in.last_update = at_version;
+      in.last_update = ctx->at_version;
   }
 
   // (logical) local ack.
@@ -1952,10 +1954,16 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
     object_info_t oi(op->poid);
     oi.version = op->old_version;
     oi.snapset = op->snapset;
-    prepare_transaction(t, op->reqid,
-                       op->poid, op->ops, op->get_data(), op->mtime,
-                       op->old_exists, op->old_size, oi, op->version,
-                       op->snapc, op->pg_trim_to);
+
+    WriteOpContext ctx(op, op->ops, op->get_data(), op->poid, op->reqid, op->mtime);
+    ctx.poi = &oi;
+    ctx.at_version = op->version;
+    ctx.snapc = op->snapc;
+    
+    bool exists = op->old_exists;
+    __u64 size = op->old_size;
+
+    prepare_transaction(&ctx, exists, size, op->pg_trim_to);
   }
   
   C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
@@ -2635,9 +2643,9 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     repop_map.erase(repop->rep_tid);
 
     if (requeue) {
-      dout(10) << " requeuing " << *repop->op << dendl;
-      rq.push_back(repop->op);
-      repop->op = 0;
+      dout(10) << " requeuing " << *repop->ctx->op << dendl;
+      rq.push_back(repop->ctx->op);
+      repop->ctx->op = 0;
     }
 
     repop->put();
index b173d93e36dd3f3962f82972016e21ad83cefec1..1814043af8aa68bbd70832e787fc711720d2a7b0 100644 (file)
@@ -58,19 +58,45 @@ public:
   };
 
   /*
-   * gather state on the primary/head while replicating an osd op.
+   * Capture all state associated with a write operation being processed
+   * on the current OSD.
+   */
+  struct WriteOpContext {
+    Message *op;
+    vector<ceph_osd_op>& ops;
+    bufferlist& data;
+    sobject_t soid;
+    osd_reqid_t reqid;
+    utime_t mtime;
+
+    SnapContext snapc;           // writer snap context
+
+    //ProjectedObjectInfo *pinfo;  // projected object state
+    object_info_t *poi;
+
+    eversion_t at_version;       // pg's current version pointer
+    ObjectStore::Transaction t;
+
+    WriteOpContext(Message *_op, vector<ceph_osd_op>& _ops, bufferlist& _data,
+                  sobject_t _soid, osd_reqid_t _reqid, utime_t _mtime) :
+      op(_op), ops(_ops), data(_data), soid(_soid), reqid(_reqid), mtime(_mtime),
+      poi(0) {}
+  };
+
+  /*
+   * State on the PG primary associated with the replicated mutation
    */
   class RepGather {
   public:
     xlist<RepGather*>::item queue_item;
     int nref;
 
-    class MOSDOp *op;
+    WriteOpContext *ctx;
+    ProjectedObjectInfo *pinfo;
+
     tid_t rep_tid;
-    utime_t mtime;
     bool noop;
 
-    ObjectStore::Transaction t;
     bool applied, aborted;
 
     set<int>  waitfor_ack;
@@ -80,29 +106,19 @@ public:
     
     utime_t   start;
     
-    ProjectedObjectInfo *pinfo;
-
-    eversion_t at_version;
-    SnapContext snapc;
-
     eversion_t          pg_local_last_complete;
     map<int,eversion_t> pg_complete_thru;
     
-    RepGather(MOSDOp *o, bool noop_, tid_t rt, 
-             ProjectedObjectInfo *i,
-             eversion_t av, eversion_t lc,
-             SnapContext& sc) :
+    RepGather(WriteOpContext *c, ProjectedObjectInfo *pi, bool noop_, tid_t rt, 
+             eversion_t lc) :
       queue_item(this),
-      nref(1), op(o), rep_tid(rt), 
-      mtime(op->get_mtime()), noop(noop_),
+      nref(1),
+      ctx(c), pinfo(pi),
+      rep_tid(rt), 
+      noop(noop_),
       applied(false), aborted(false),
       sent_ack(false), sent_nvram(false), sent_disk(false),
-      pinfo(i),
-      at_version(av), 
-      snapc(sc),
-      pg_local_last_complete(lc) {
-      mtime = op->get_mtime();
-    }
+      pg_local_last_complete(lc) { }
 
     bool can_send_ack() { 
       return
@@ -129,7 +145,8 @@ public:
     void put() {
       assert(nref > 0);
       if (--nref == 0) {
-       delete op;
+       assert(!pinfo);
+       delete ctx;
        delete this;
        //generic_dout(0) << "deleting " << this << dendl;
       }
@@ -147,10 +164,7 @@ protected:
   void apply_repop(RepGather *repop);
   void eval_repop(RepGather*);
   void issue_repop(RepGather *repop, int dest, utime_t now);
-  RepGather *new_repop(MOSDOp *op, bool noop, tid_t rep_tid,
-                      ProjectedObjectInfo *pinfo,
-                      eversion_t nv,
-                      SnapContext& snapc);
+  RepGather *new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo, bool noop, tid_t rep_tid);
   void repop_ack(RepGather *repop,
                  int result, int ack_type,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
@@ -204,12 +218,7 @@ protected:
   int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
                        sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
                        vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc); 
-  void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
-                          sobject_t poid, 
-                          vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
-                          bool& exists, __u64& size, object_info_t& oi,
-                          eversion_t at_version, SnapContext& snapc,
-                          eversion_t trim_to);
+  void prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to);
   
   friend class C_OSD_ModifyCommit;
   friend class C_OSD_RepModifyCommit;
@@ -278,8 +287,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
     //<< " wfnvram=" << repop.waitfor_nvram
       << " wfdisk=" << repop.waitfor_disk;
   out << " pct=" << repop.pg_complete_thru;
-  if (repop.op)
-    out << " op=" << *(repop.op);
+  if (repop.ctx->op)
+    out << " op=" << *(repop.ctx->op);
   out << ")";
   return out;
 }