]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: refactored to use MOSDSubOp; ripped out splay, chain support for now
authorSage Weil <sage@newdream.net>
Thu, 20 Dec 2007 20:27:07 +0000 (12:27 -0800)
committerSage Weil <sage@newdream.net>
Thu, 20 Dec 2007 20:27:07 +0000 (12:27 -0800)
13 files changed:
src/messages/MOSDOpReply.h
src/messages/MOSDSubOp.h
src/messages/MOSDSubOpReply.h
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/RAID4PG.cc
src/osd/RAID4PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 0a39395565395aa4375b78e363c21fd93466cbbd..be22f44b26665876f4dc0b332dd0e77776263277 100644 (file)
@@ -51,8 +51,6 @@ class MOSDOpReply : public Message {
     eversion_t pg_complete_thru;
     
     epoch_t map_epoch;
-
-    osd_peer_stat_t peer_stat;
   } st;
 
   map<string,bufferptr> attrset;
@@ -86,9 +84,6 @@ class MOSDOpReply : public Message {
   void set_op(int op) { st.op = op; }
   void set_rep_tid(tid_t t) { st.rep_tid = t; }
 
-  void set_peer_stat(const osd_peer_stat_t& stat) { st.peer_stat = stat; }
-  const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
-
   // osdmap
   epoch_t get_map_epoch() { return st.map_epoch; }
 
index 9f4e77bb8c8e0e6dd74a250f13a634ce62d4a230..37a70a120c55431ae96871b1b5874f2cf276835e 100644 (file)
@@ -79,20 +79,19 @@ public:
   const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
  
   MOSDSubOp(osdreqid_t r, pg_t p, pobject_t po, int o, off_t of, off_t le,
-           epoch_t mape, tid_t rtid, eversion_t v, evertsion_t pgtt) :
-    Message(CEPH_MSG_OSD_SUBOP) {
+           epoch_t mape, tid_t rtid, eversion_t v) :
+    Message(MSG_OSD_SUBOP) {
     memset(&st, 0, sizeof(st));
     st.reqid = r;
 
     st.pgid = p;
     st.poid = po;
-    st.o = op;
-    st.of = offset;
-    st.le = length;
+    st.op = o;
+    st.offset = of;
+    st.length = le;
     st.map_epoch = mape;
     st.rep_tid = rtid;
     st.version = v;
-    st.pg_trim_to = pgtt;
   }
   MOSDSubOp() {}
 
@@ -112,7 +111,7 @@ public:
   virtual char *get_type_name() { return "osd_sub_op"; }
   void print(ostream& out) {
     out << "osd_sub_op(" << st.reqid
-       << " " << get_opname(st.op)
+       << " " << MOSDOp::get_opname(st.op)
        << " " << st.poid
        << " v" << st.version;    
     if (st.length) out << " " << st.offset << "~" << st.length;
index c65f9f5614953567cea0b52136ce3471c8745a16..056e2dd4656a4905a032aa49b595cd6e0373eaee 100644 (file)
@@ -34,6 +34,7 @@ class MOSDSubOpReply : public Message {
     epoch_t map_epoch;
 
     // subop metadata
+    osdreqid_t reqid;
     pg_t pgid;
     tid_t rep_tid;
     int32_t op;
@@ -75,8 +76,9 @@ class MOSDSubOpReply : public Message {
 
 public:
   MOSDSubOpReply(MOSDSubOp *req, int result, epoch_t e, bool commit) :
-    Message(CEPH_MSG_OSD_OPREPLY) {
+    Message(MSG_OSD_SUBOPREPLY) {
     st.map_epoch = e;
+    st.reqid = req->get_reqid();
     st.pgid = req->get_pg();
     st.rep_tid = req->get_rep_tid();
     st.op = req->get_op();
@@ -104,9 +106,9 @@ public:
   virtual char *get_type_name() { return "osd_op_reply"; }
   
   void print(ostream& out) {
-    out << "osd_op_reply(" << st.reqid
+    out << "osd_sub_op_reply(" << st.reqid
        << " " << MOSDOp::get_opname(st.op)
-       << " " << st.oid;
+       << " " << st.poid;
     if (st.length) out << " " << st.offset << "~" << st.length;
     if (st.op >= 10) {
       if (st.commit)
index 2ee7e0faa9e0eebb0511d5f08ea6611bf14a4de1..ed4ebb8487e41e1057b2834ab013c6a251fb89e5 100644 (file)
@@ -175,10 +175,10 @@ decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data)
   case CEPH_MSG_OSD_OPREPLY:
     m = new MOSDOpReply();
     break;
-  case CEPH_MSG_OSD_SUBOP:
+  case MSG_OSD_SUBOP:
     m = new MOSDSubOp();
     break;
-  case CEPH_MSG_OSD_SUBOPREPLY:
+  case MSG_OSD_SUBOPREPLY:
     m = new MOSDSubOpReply();
     break;
 
index 6f0b3f1b72d3d7cbdb5a5accd78b0fd737265060..e3b8612b10e34a8ab1b2721ec98a63857b0179af 100644 (file)
@@ -34,6 +34,7 @@
 #define MSG_OSD_OUT          74
 
 #define MSG_OSD_SUBOP        75
+#define MSG_OSD_SUBOPREPLY   76
 
 #define MSG_OSD_PG_NOTIFY      80
 #define MSG_OSD_PG_QUERY       81
index e3344618da7b95cddf61e4a9fb8c789cf68295d6..eeb8dc5c5f4563b0057268e564b2af32ac773c7d 100644 (file)
@@ -44,6 +44,8 @@
 #include "messages/MOSDFailure.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDIn.h"
 #include "messages/MOSDOut.h"
@@ -965,7 +967,6 @@ void OSD::dispatch(Message *m)
       switch (m->get_type()) {
 
       case MSG_OSD_PING:
-        // take note.
         handle_osd_ping((MOSDPing*)m);
         break;
         
@@ -985,13 +986,17 @@ void OSD::dispatch(Message *m)
         handle_pg_activate_set((MOSDPGActivateSet*)m);
         break;
 
+       // client ops
       case CEPH_MSG_OSD_OP:
         handle_op((MOSDOp*)m);
         break;
         
         // for replication etc.
-      case CEPH_MSG_OSD_OPREPLY:
-        handle_op_reply((MOSDOpReply*)m);
+      case MSG_OSD_SUBOP:
+       handle_sub_op((MOSDSubOp*)m);
+       break;
+      case MSG_OSD_SUBOPREPLY:
+        handle_sub_op_reply((MOSDSubOpReply*)m);
         break;
         
         
@@ -2301,8 +2306,10 @@ void OSD::handle_op(MOSDOp *op)
     // do it now.
     if (op->get_type() == CEPH_MSG_OSD_OP)
       pg->do_op((MOSDOp*)op);
-    else if (op->get_type() == CEPH_MSG_OSD_OPREPLY)
-      pg->do_op_reply((MOSDOpReply*)op);
+    else if (op->get_type() == MSG_OSD_SUBOP)
+      pg->do_sub_op((MOSDSubOp*)op);
+    else if (op->get_type() == MSG_OSD_SUBOPREPLY)
+      pg->do_sub_op_reply((MOSDSubOpReply*)op);
     else 
       assert(0);
   } else {
@@ -2314,7 +2321,42 @@ void OSD::handle_op(MOSDOp *op)
 }
 
 
-void OSD::handle_op_reply(MOSDOpReply *op)
+void OSD::handle_sub_op(MOSDSubOp *op)
+{
+  dout(10) << "handle_sub_op " << *op << " epoch " << op->get_map_epoch() << dendl;
+  if (op->get_map_epoch() < boot_epoch) {
+    dout(3) << "replica op from before boot" << dendl;
+    delete op;
+    return;
+  }
+
+  // must be a rep op.
+  assert(op->get_source().is_osd());
+  
+  // make sure we have the pg
+  const pg_t pgid = op->get_pg();
+
+  // require same or newer map
+  if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+
+  // share our map with sender, if they're old
+  _share_map_incoming(op->get_source_inst(), op->get_map_epoch());
+
+  if (!_have_pg(pgid)) {
+    // hmm.
+    delete op;
+    return;
+  } 
+
+  PG *pg = _lookup_lock_pg(pgid);
+  if (g_conf.osd_maxthreads < 1) {
+    pg->do_sub_op(op);    // do it now
+  } else {
+    enqueue_op(pg, op);     // queue for worker threads
+  }
+  pg->unlock();
+}
+void OSD::handle_sub_op_reply(MOSDSubOpReply *op)
 {
   if (op->get_map_epoch() < boot_epoch) {
     dout(3) << "replica op reply from before boot" << dendl;
@@ -2342,7 +2384,7 @@ void OSD::handle_op_reply(MOSDOpReply *op)
 
   PG *pg = _lookup_lock_pg(pgid);
   if (g_conf.osd_maxthreads < 1) {
-    pg->do_op_reply(op);    // do it now
+    pg->do_sub_op_reply(op);    // do it now
   } else {
     enqueue_op(pg, op);     // queue for worker threads
   }
@@ -2397,8 +2439,10 @@ void OSD::dequeue_op(PG *pg)
   // do it
   if (op->get_type() == CEPH_MSG_OSD_OP)
     pg->do_op((MOSDOp*)op); // do it now
-  else if (op->get_type() == CEPH_MSG_OSD_OPREPLY)
-    pg->do_op_reply((MOSDOpReply*)op);
+  else if (op->get_type() == MSG_OSD_SUBOP)
+    pg->do_sub_op((MOSDSubOp*)op);
+  else if (op->get_type() == MSG_OSD_SUBOPREPLY)
+    pg->do_sub_op_reply((MOSDSubOpReply*)op);
   else 
     assert(0);
 
index f49f22e1aef95116c2a67a7c9f0fdcef3f19cde5..73e78a95ff5200c900dfc53892a57fba48739c7a 100644 (file)
@@ -362,7 +362,8 @@ private:
 
   void handle_osd_ping(class MOSDPing *m);
   void handle_op(class MOSDOp *m);
-  void handle_op_reply(class MOSDOpReply *m);
+  void handle_sub_op(class MOSDSubOp *m);
+  void handle_sub_op_reply(class MOSDSubOpReply *m);
 
   void force_remount();
 };
index dbdf9fff0c89738a13e0472bf7b17ff763dba57a..25a64a8ba101be10f3d107d902670baf755ad31f 100644 (file)
@@ -1289,3 +1289,6 @@ bool PG::pick_object_rev(object_t& oid)
 
 
 
+
+
+
index 16978922f9a06828a5394e04a33abc03294f0078..b41f8d061de12ba8e9938c2c19a44ccf6811bab3 100644 (file)
@@ -38,6 +38,8 @@ using namespace __gnu_cxx;
 class OSD;
 class MOSDOp;
 class MOSDOpReply;
+class MOSDSubOp;
+class MOSDSubOpReply;
 class MOSDPGActivateSet;
 
 /** PG - Replica Placement Group
@@ -493,7 +495,6 @@ protected:
   map<object_t, eversion_t> objects_pulling;  // which objects are currently being pulled
   
 
-
   // stats
   off_t stat_size;
   off_t stat_num_blocks;
@@ -645,14 +646,15 @@ public:
   // abstract bits
   virtual bool preprocess_op(MOSDOp *op, utime_t now) { return false; } 
   virtual void do_op(MOSDOp *op) = 0;
-  virtual void do_op_reply(MOSDOpReply *op) = 0;
+  virtual void do_sub_op(MOSDSubOp *op) = 0;
+  virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
 
   virtual bool same_for_read_since(epoch_t e) = 0;
   virtual bool same_for_modify_since(epoch_t e) = 0;
   virtual bool same_for_rep_modify_since(epoch_t e) = 0;
 
   virtual bool is_missing_object(object_t oid) = 0;
-  virtual void wait_for_missing_object(object_t oid, MOSDOp *op) = 0;
+  virtual void wait_for_missing_object(object_t oid, Message *op) = 0;
 
   virtual void on_osd_failure(int osd) = 0;
   virtual void on_acker_change() = 0;
index 62eb6d05501cd34a16f9ab3e0276034f5bc36fc0..740aa1d5585ab2df5bc2d7334b2c3b97c2ef535d 100644 (file)
@@ -70,7 +70,12 @@ void RAID4PG::do_op(MOSDOp *op)
 
 }
 
-void RAID4PG::do_op_reply(MOSDOpReply *reply)
+void RAID4PG::do_sub_op(MOSDSubOp *op)
+{
+
+}
+
+void RAID4PG::do_sub_op_reply(MOSDSubOpReply *reply)
 {
 
 }
@@ -104,7 +109,7 @@ bool RAID4PG::is_missing_object(object_t oid)
   return false;
 }
 
-void RAID4PG::wait_for_missing_object(object_t oid, MOSDOp *op)
+void RAID4PG::wait_for_missing_object(object_t oid, Message *op)
 {
   //assert(0);
 }
index 0469f867f7e95235c4c9922de6ff91fe944d86fe..010371a25fd921f0cfd07993d7799ab307124be2 100644 (file)
@@ -52,14 +52,15 @@ public:
 
   bool preprocess_op(MOSDOp *op, utime_t now);
   void do_op(MOSDOp *op);
-  void do_op_reply(MOSDOpReply *r);
+  void do_sub_op(MOSDSubOp *op);
+  void do_sub_op_reply(MOSDSubOpReply *r);
 
   bool same_for_read_since(epoch_t e);
   bool same_for_modify_since(epoch_t e);
   bool same_for_rep_modify_since(epoch_t e);
 
   bool is_missing_object(object_t oid);
-  void wait_for_missing_object(object_t oid, MOSDOp *op);
+  void wait_for_missing_object(object_t oid, Message *op);
 
   void on_osd_failure(int o);
   void on_acker_change();
index 78d63a2a8486dea67e3db0619fca881414062206..e84b30f93ea71ed834949e1e783a0fd0de880204 100644 (file)
@@ -19,6 +19,8 @@
 
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
 
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGRemove.h"
@@ -54,14 +56,8 @@ bool ReplicatedPG::same_for_modify_since(epoch_t e)
 bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
 {
   // check osd map: same set, or primary+acker?
-
-  if (g_conf.osd_rep == OSD_REP_CHAIN) {
-    return e >= info.history.same_since;   // whole pg set same
-  } else {
-    // primary, splay
-    return (e >= info.history.same_primary_since &&
-           e >= info.history.same_acker_since);    
-  }
+  return (e >= info.history.same_primary_since &&
+         e >= info.history.same_acker_since);    
 }
 
 // ====================
@@ -73,7 +69,7 @@ bool ReplicatedPG::is_missing_object(object_t oid)
 }
  
 
-void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op)
+void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m)
 {
   assert(is_missing_object(oid));
 
@@ -93,11 +89,11 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op)
            << dendl;
     pull(oid);
   }
-  waiting_for_missing_object[oid].push_back(op);
+  waiting_for_missing_object[oid].push_back(m);
 }
 
 
-
+// ==========================================================
 
 /** preprocess_op - preprocess an op (before it gets queued).
  * fasttrack read
@@ -364,12 +360,43 @@ void ReplicatedPG::do_op(MOSDOp *op)
     op_read(op);
     break;
     
+    // writes
+  case CEPH_OSD_OP_WRNOOP:
+  case CEPH_OSD_OP_WRITE:
+  case CEPH_OSD_OP_ZERO:
+  case CEPH_OSD_OP_DELETE:
+  case CEPH_OSD_OP_TRUNCATE:
+  case CEPH_OSD_OP_WRLOCK:
+  case CEPH_OSD_OP_WRUNLOCK:
+  case CEPH_OSD_OP_RDLOCK:
+  case CEPH_OSD_OP_RDUNLOCK:
+  case CEPH_OSD_OP_UPLOCK:
+  case CEPH_OSD_OP_DNLOCK:
+  case CEPH_OSD_OP_BALANCEREADS:
+  case CEPH_OSD_OP_UNBALANCEREADS:
+    op_modify(op);
+    break;
+    
+  default:
+    assert(0);
+  }
+}
+
+
+void ReplicatedPG::do_sub_op(MOSDSubOp *op)
+{
+  dout(15) << "do_sub_op " << *op << dendl;
+
+  osd->logger->inc("subop");
+
+  switch (op->get_op()) {
+    
     // rep stuff
   case CEPH_OSD_OP_PULL:
-    op_pull(op);
+    sub_op_pull(op);
     break;
   case CEPH_OSD_OP_PUSH:
-    op_push(op);
+    sub_op_push(op);
     break;
     
     // writes
@@ -386,48 +413,27 @@ void ReplicatedPG::do_op(MOSDOp *op)
   case CEPH_OSD_OP_DNLOCK:
   case CEPH_OSD_OP_BALANCEREADS:
   case CEPH_OSD_OP_UNBALANCEREADS:
-    if (op->get_source().is_osd()) {
-      op_rep_modify(op);
-    } else {
-      // go go gadget pg
-      op_modify(op);
-    }
+    sub_op_modify(op);
     break;
     
   default:
     assert(0);
   }
+
 }
 
-void ReplicatedPG::do_op_reply(MOSDOpReply *r)
+void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
 {
   if (r->get_op() == CEPH_OSD_OP_PUSH) {
     // continue peer recovery
-    op_push_reply(r);
+    sub_op_push_reply(r);
   } else {
-    // must be replication.
-    tid_t rep_tid = r->get_rep_tid();
-    int fromosd = r->get_source().num();
-
-    osd->take_peer_stat(fromosd, r->get_peer_stat());
-
-    if (rep_gather.count(rep_tid)) {
-      // oh, good.
-      repop_ack(rep_gather[rep_tid], 
-               r->get_result(), r->get_commit(), 
-               fromosd, 
-               r->get_pg_complete_thru());
-      delete r;
-    } else {
-      // early ack.
-      waiting_for_repop[rep_tid].push_back(r);
-    }
+    sub_op_modify_reply(r);
   }
 }
 
 
 
-
 // ========================================================================
 // READS
 
@@ -561,30 +567,28 @@ void ReplicatedPG::op_read(MOSDOp *op)
 // MODIFY
 
 void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t, 
-                                          MOSDOp *op, eversion_t& version, 
+                                          osdreqid_t reqid, pobject_t poid, int op, eversion_t version,
                                           objectrev_t crev, objectrev_t rev,
                                           eversion_t trim_to)
 {
-  const object_t oid = op->get_oid();
-
   // clone entry?
   if (crev && rev && rev > crev) {
     eversion_t cv = version;
     cv.version--;
-    Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv, op->get_reqid());
+    Log::Entry cloneentry(PG::Log::Entry::CLONE, poid.oid, cv, reqid);
     log.add(cloneentry);
 
-    dout(10) << "prepare_log_transaction " << op->get_op()
+    dout(10) << "prepare_log_transaction " << op
             << " " << cloneentry
             << dendl;
   }
 
   // actual op
   int opcode = Log::Entry::MODIFY;
-  if (op->get_op() == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
-  Log::Entry logentry(opcode, oid, version, op->get_reqid());
+  if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
+  Log::Entry logentry(opcode, poid.oid, version, reqid);
 
-  dout(10) << "prepare_log_transaction " << op->get_op()
+  dout(10) << "prepare_log_transaction " << op
            << " " << logentry
            << dendl;
 
@@ -602,24 +606,22 @@ void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t,
 /** prepare_op_transaction
  * apply an op to the store wrapped in a transaction.
  */
-void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, 
-                                         MOSDOp *op, eversion_t& version, 
-                                         objectrev_t crev, objectrev_t rev)
+void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, const osdreqid_t& reqid,
+                                         pg_t pgid, int op, pobject_t poid, 
+                                         off_t offset, off_t length, bufferlist& bl,
+                                         eversion_t& version, objectrev_t crev, objectrev_t rev)
 {
-  const object_t oid = op->get_oid();
-  const pg_t pgid = op->get_pg();
-
   bool did_clone = false;
 
-  dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op->get_op() )
-           << " " << oid 
+  dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op )
+           << " " << poid 
            << " v " << version
           << " crev " << crev
           << " rev " << rev
            << dendl;
   
   // WRNOOP does nothing.
-  if (op->get_op() == CEPH_OSD_OP_WRNOOP) 
+  if (op == CEPH_OSD_OP_WRNOOP) 
     return;
 
   // raise last_complete?
@@ -635,45 +637,46 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
 
   // clone?
   if (crev && rev && rev > crev) {
-    object_t noid = oid;
-    noid.rev = rev;
-    dout(10) << "prepare_op_transaction cloning " << oid << " crev " << crev << " to " << noid << dendl;
-    t.clone(oid, noid);
+    assert(0);
+    pobject_t noid = poid;  // FIXME ****
+    noid.oid.rev = rev;
+    dout(10) << "prepare_op_transaction cloning " << poid << " crev " << crev << " to " << noid << dendl;
+    t.clone(poid, noid);
     did_clone = true;
   }  
 
   // apply the op
-  switch (op->get_op()) {
+  switch (op) {
 
     // -- locking --
 
   case CEPH_OSD_OP_WRLOCK:
     { // lock object
-      t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
+      t.setattr(poid, "wrlock", &reqid.name, sizeof(entity_name_t));
     }
     break;  
   case CEPH_OSD_OP_WRUNLOCK:
     { // unlock objects
-      t.rmattr(oid, "wrlock");
+      t.rmattr(poid, "wrlock");
     }
     break;
 
   case CEPH_OSD_OP_MININCLOCK:
     {
-      uint32_t mininc = op->get_length();
-      t.setattr(oid, "mininclock", &mininc, sizeof(mininc));
+      uint32_t mininc = length;
+      t.setattr(poid, "mininclock", &mininc, sizeof(mininc));
     }
     break;
 
   case CEPH_OSD_OP_BALANCEREADS:
     {
       bool bal = true;
-      t.setattr(oid, "balance-reads", &bal, sizeof(bal));
+      t.setattr(poid, "balance-reads", &bal, sizeof(bal));
     }
     break;
   case CEPH_OSD_OP_UNBALANCEREADS:
     {
-      t.rmattr(oid, "balance-reads");
+      t.rmattr(poid, "balance-reads");
     }
     break;
 
@@ -682,12 +685,10 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
 
   case CEPH_OSD_OP_WRITE:
     { // write
-      assert(op->get_data().length() == op->get_length());
-      bufferlist bl;
-      bl.claim( op->get_data() );  // give buffers to store; we keep *op in memory for a long time!
-      
-      //if (oid < 100000000000000ULL)  // hack hack-- don't write client data
-      t.write( oid, op->get_offset(), op->get_length(), bl );
+      assert(bl.length() == length);
+      bufferlist nbl;
+      nbl.claim(bl);    // give buffers to store; we keep *op in memory for a long time!
+      t.write(poid, offset, length, nbl);
     }
     break;
     
@@ -695,38 +696,28 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
     {
       // zero, remove, or truncate?
       struct stat st;
-      int r = osd->store->stat(oid, &st);
+      int r = osd->store->stat(poid, &st);
       if (r >= 0) {
-       if (op->get_length() == 0 ||
-           op->get_offset() + (off_t)op->get_length() >= (off_t)st.st_size) {
-         if (op->get_offset()) 
-           t.truncate(oid, op->get_length() + op->get_offset());
-         else
-           t.remove(oid);
-       } else {
-         // zero.  the dumb way.  FIXME.
-         bufferptr bp(op->get_length());
-         bp.zero();
-         bufferlist bl;
-         bl.push_back(bp);
-         t.write(oid, op->get_offset(), op->get_length(), bl);
-       }
+       if (offset == 0 && offset + length >= (off_t)st.st_size) 
+         t.remove(poid);
+       else
+         t.zero(poid, offset, length);
       } else {
        // noop?
-       dout(10) << "apply_transaction zero on " << oid << ", but dne?  stat returns " << r << dendl;
+       dout(10) << "apply_transaction zero on " << poid << ", but dne?  stat returns " << r << dendl;
       }
     }
     break;
 
   case CEPH_OSD_OP_TRUNCATE:
     { // truncate
-      t.truncate(oid, op->get_length() );
+      t.truncate(poid, length);
     }
     break;
     
   case CEPH_OSD_OP_DELETE:
     { // delete
-      t.remove(oid);
+      t.remove(poid);
     }
     break;
     
@@ -735,20 +726,20 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
   }
   
   // object collection, version
-  if (op->get_op() == CEPH_OSD_OP_DELETE) {
+  if (op == CEPH_OSD_OP_DELETE) {
     // remove object from c
-    t.collection_remove(pgid, oid);
+    t.collection_remove(pgid, poid);
   } else {
     // add object to c
-    t.collection_add(pgid, oid);
+    t.collection_add(pgid, poid);
     
     // object version
-    t.setattr(oid, "version", &version, sizeof(version));
+    t.setattr(poid, "version", &version, sizeof(version));
 
     // set object crev
     if (crev == 0 ||   // new object
        did_clone)     // we cloned
-      t.setattr(oid, "crev", &rev, sizeof(rev));
+      t.setattr(poid, "crev", &rev, sizeof(rev));
   }
 }
 
@@ -899,42 +890,30 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
 }
 
 
-void ReplicatedPG::issue_repop(MOSDOp *op, int dest, utime_t now)
+void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
 {
-  object_t oid = op->get_oid();
-  
-  dout(7) << " issue_repop rep_tid " << op->get_rep_tid()
-          << " o " << oid
+  pobject_t poid = repop->op->get_oid();
+  dout(7) << " issue_repop rep_tid " << repop->rep_tid
+          << " o " << poid
           << " to osd" << dest
           << dendl;
   
   // forward the write/update/whatever
-  MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid,
-                          oid,
-                          ObjectLayout(info.pgid),
-                          osd->osdmap->get_epoch(),
-                          op->get_op());
-  wr->get_data() = op->get_data();   // _copy_ bufferlist
-  wr->set_length(op->get_length());
-  wr->set_offset(op->get_offset());
-  wr->set_version(op->get_version());
-  
-  wr->set_rep_tid(op->get_rep_tid());
+  MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid,
+                               repop->op->get_op(), 
+                               repop->op->get_offset(), repop->op->get_length(), 
+                               osd->osdmap->get_epoch(), 
+                               repop->rep_tid, repop->new_version);
+  wr->get_data() = repop->op->get_data();   // _copy_ bufferlist
   wr->set_pg_trim_to(peers_complete_thru);
-
   wr->set_peer_stat(osd->get_my_stat_for(now, dest));
-  
   osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op)
+ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv)
 {
   dout(10) << "new_rep_gather rep_tid " << op->get_rep_tid() << " on " << *op << dendl;
-  int whoami = osd->get_nodeid();
-
-  RepGather *repop = new RepGather(op, op->get_rep_tid(), 
-                                               op->get_version(), 
-                                               info.last_complete);
+  RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete);
   
   // osds. commits all come to me.
   for (unsigned i=0; i<acting.size(); i++) {
@@ -943,33 +922,15 @@ ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op)
     repop->waitfor_commit.insert(osd);
   }
 
-  // acks vary:
-  if (g_conf.osd_rep == OSD_REP_CHAIN) {
-    // chain rep. 
-    // there's my local ack...
-    repop->osds.insert(whoami);
-    repop->waitfor_ack.insert(whoami);
-    repop->waitfor_commit.insert(whoami);
-
-    // also, the previous guy will ack to me
-    int myrank = osd->osdmap->calc_pg_rank(whoami, acting);
-    if (myrank > 0) {
-      int osd = acting[ myrank-1 ];
-      repop->osds.insert(osd);
-      repop->waitfor_ack.insert(osd);
-      repop->waitfor_commit.insert(osd);
-    }
-  } else {
-    // primary, splay.  all osds ack to me.
-    for (unsigned i=0; i<acting.size(); i++) {
-      int osd = acting[i];
-      repop->waitfor_ack.insert(osd);
-    }
+  // primary.  all osds ack to me.
+  for (unsigned i=0; i<acting.size(); i++) {
+    int osd = acting[i];
+    repop->waitfor_ack.insert(osd);
   }
-
+  
   repop->start = g_clock.now();
 
-  rep_gather[ repop->rep_tid ] = repop;
+  rep_gather[repop->rep_tid] = repop;
 
   // anyone waiting?  (acks that got here before the op did)
   if (waiting_for_repop.count(repop->rep_tid)) {
@@ -1102,7 +1063,7 @@ objectrev_t ReplicatedPG::assign_version(MOSDOp *op)
 class C_OSD_RepModifyCommit : public Context {
 public:
   ReplicatedPG *pg;
-  MOSDOp *op;
+  MOSDSubOp *op;
   int destosd;
 
   eversion_t pg_last_complete;
@@ -1112,7 +1073,7 @@ public:
   bool acked;
   bool waiting;
 
-  C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) : 
+  C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : 
     pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
     acked(false), waiting(false) { 
     pg->get();  // we're copying the pointer.
@@ -1128,7 +1089,7 @@ public:
     lock.Unlock();
 
     pg->lock();
-    pg->op_rep_modify_commit(op, destosd, pg_last_complete);
+    pg->sub_op_modify_commit(op, destosd, pg_last_complete);
     pg->put_unlock();
   }
   void ack() {
@@ -1224,95 +1185,50 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   utime_t now = g_clock.now();
 
   // issue replica writes
-  RepGather *repop = 0;
-  bool alone = (acting.size() == 1);
   tid_t rep_tid = osd->get_tid();
-  op->set_rep_tid(rep_tid);
-
-  if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
-    // chain rep.  send to #2 only.
-    int next = acting[1];
-    if (acting.size() > 2)
-      next = acting[2];
-    issue_repop(op, next, now);
-  } 
-  else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
-    // splay rep.  send to rest.
-    for (unsigned i=1; i<acting.size(); ++i)
-    //for (unsigned i=acting.size()-1; i>=1; --i)
-      issue_repop(op, acting[i], now);
-  } else {
-    // primary rep, or alone.
-    repop = new_rep_gather(op);
+  RepGather *repop = new_rep_gather(op, rep_tid, nv);
+  for (unsigned i=1; i<acting.size(); i++)
+    issue_repop(repop, acting[i], now);
 
-    // send to rest.
-    if (!alone)
-      for (unsigned i=1; i<acting.size(); i++)
-        issue_repop(op, acting[i], now);
+  // we are acker.
+  if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+    // log and update later.
+    pobject_t poid = oid;
+    prepare_log_transaction(repop->t, op->get_reqid(), poid, op->get_op(), nv,
+                           crev, op->get_rev(), peers_complete_thru);
+    prepare_op_transaction(repop->t, op->get_reqid(),
+                          info.pgid, op->get_op(), poid, 
+                          op->get_offset(), op->get_length(), op->get_data(),
+                          nv, crev, op->get_rev());
   }
-
-  if (repop) {    
-    // we are acker.
-    if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
-      // log and update later.
-      prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), peers_complete_thru);
-      prepare_op_transaction(repop->t, op, nv, crev, op->get_rev());
-    }
-
-    // (logical) local ack.
-    // (if alone, this will apply the update.)
-    get_rep_gather(repop);
-    {
-      assert(repop->waitfor_ack.count(whoami));
-      repop->waitfor_ack.erase(whoami);
-    }
-    put_rep_gather(repop);
-
-  } else {
-    // not acker.  
-    // chain or splay.  apply.
-    ObjectStore::Transaction t;
-    prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
-    prepare_op_transaction(t, op, nv, crev, op->get_rev());
-
-    C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(), 
-                                                                info.last_complete);
-    unsigned r = osd->store->apply_transaction(t, oncommit);
-    if (r != 0 &&   // no errors
-        r != 2) {   // or error on collection_add
-      derr(0) << "error applying transaction: r = " << r << dendl;
-      assert(r == 0);
-    }
-
-    // lets evict the data from our cache to maintain a total large cache size
-    if (g_conf.osd_exclusive_caching)
-      osd->store->trim_from_cache(op->get_oid(), op->get_offset(), op->get_length());
-
-    oncommit->ack();
+  
+  // (logical) local ack.
+  // (if alone, this will apply the update.)
+  get_rep_gather(repop);
+  {
+    assert(repop->waitfor_ack.count(whoami));
+    repop->waitfor_ack.erase(whoami);
   }
-
+  put_rep_gather(repop);
 }
 
 
 
-// replicated 
-
-
-
+// sub op modify
 
-void ReplicatedPG::op_rep_modify(MOSDOp *op)
+void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
 {
-  object_t oid = op->get_oid();
+  pobject_t poid = op->get_poid();
   eversion_t nv = op->get_version();
 
   const char *opname = MOSDOp::get_opname(op->get_op());
 
   // check crev
   objectrev_t crev = 0;
-  osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+  osd->store->getattr(poid, "crev", (char*)&crev, sizeof(crev));
 
-  dout(10) << "op_rep_modify " << opname 
-           << " " << oid 
+  dout(10) << "sub_op_modify " << opname 
+           << " " << poid 
            << " v " << nv 
            << " " << op->get_offset() << "~" << op->get_length()
            << dendl;  
@@ -1322,113 +1238,52 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op)
   osd->take_peer_stat(fromosd, op->get_peer_stat());
 
   // we better not be missing this.
-  assert(!missing.is_missing(oid));
+  assert(!missing.is_missing(poid.oid));
 
   // prepare our transaction
   ObjectStore::Transaction t;
 
-  // am i acker?
-  RepGather *repop = 0;
+  // do op
   int ackerosd = acting[0];
-
-  if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
-    ackerosd = get_acker();
-  
-    if (is_acker()) {
-      // i am tail acker.
-      if (rep_gather.count(op->get_rep_tid())) {
-        repop = rep_gather[ op->get_rep_tid() ];
-      } else {
-        repop = new_rep_gather(op);
-      }
-      
-      // infer ack from source
-      get_rep_gather(repop);
-      {
-        //assert(repop->waitfor_ack.count(fromosd));   // no, we may come thru here twice.
-        repop->waitfor_ack.erase(fromosd);
-      }
-      put_rep_gather(repop);
-
-      // prepare dest socket
-      //messenger->prepare_send_message(op->get_client());
-    }
-
-    // chain?  forward?
-    if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) {
-      // chain rep, not at the tail yet.
-      int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting);
-      int next = myrank+1;
-      if (next == (int)acting.size())
-       next = 1;
-      issue_repop(op, acting[next], g_clock.now());    
-    }
-  }
-
-  // do op?
-  C_OSD_RepModifyCommit *oncommit = 0;
-
   osd->logger->inc("r_wr");
   osd->logger->inc("r_wrb", op->get_length());
   
-  if (repop) {
-    // acker.  we'll apply later.
-    if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
-      prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), op->get_pg_trim_to());
-      prepare_op_transaction(repop->t, op, nv, crev, op->get_rev());
-    }
-  } else {
-    // middle|replica.
-    if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
-      prepare_log_transaction(t, op, nv, crev, op->get_rev(), op->get_pg_trim_to());
-      prepare_op_transaction(t, op, nv, crev, op->get_rev());
-    }
-
-    oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
-
-    // apply log update. and possibly update itself.
-    unsigned tr = osd->store->apply_transaction(t, oncommit);
-    if (tr != 0 &&   // no errors
-        tr != 2) {   // or error on collection_add
-      derr(0) << "error applying transaction: r = " << tr << dendl;
-      assert(tr == 0);
-    }
+  if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+    prepare_log_transaction(t, op->get_reqid(), op->get_poid(), op->get_op(), op->get_version(),
+                           crev, 0, op->get_pg_trim_to());
+    prepare_op_transaction(t, op->get_reqid(), 
+                          info.pgid, op->get_op(), poid, 
+                          op->get_offset(), op->get_length(), op->get_data(), 
+                          nv, crev, 0);
   }
   
-  // ack?
-  if (repop) {
-    // (logical) local ack.  this may induce the actual update.
-    get_rep_gather(repop);
-    {
-      assert(repop->waitfor_ack.count(osd->get_nodeid()));
-      repop->waitfor_ack.erase(osd->get_nodeid());
-    }
-    put_rep_gather(repop);
-  } 
-  else {
-    // send ack to acker?
-    if (g_conf.osd_rep != OSD_REP_CHAIN) {
-      MOSDOpReply *ack = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), false);
-      ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
-      osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd));
-    }
-
-    // ack myself.
-    assert(oncommit);
-    oncommit->ack(); 
+  C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
+  
+  // apply log update. and possibly update itself.
+  unsigned tr = osd->store->apply_transaction(t, oncommit);
+  if (tr != 0 &&   // no errors
+      tr != 2) {   // or error on collection_add
+    derr(0) << "error applying transaction: r = " << tr << dendl;
+    assert(tr == 0);
   }
-
+  
+  // send ack to acker
+  MOSDSubOpReply *ack = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false);
+  ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
+  osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd));
+  
+  // ack myself.
+  oncommit->ack(); 
 }
 
-
-void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete)
+void ReplicatedPG::sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete)
 {
   // send commit.
   dout(10) << "rep_modify_commit on op " << *op
            << ", sending commit to osd" << ackerosd
            << dendl;
   if (osd->osdmap->is_up(ackerosd)) {
-    MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
+    MOSDSubOpReply *commit = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), true);
     commit->set_pg_complete_thru(last_complete);
     commit->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
     osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd));
@@ -1436,6 +1291,27 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las
   }
 }
 
+void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
+{
+  // must be replication.
+  tid_t rep_tid = r->get_rep_tid();
+  int fromosd = r->get_source().num();
+  
+  osd->take_peer_stat(fromosd, r->get_peer_stat());
+  
+  if (rep_gather.count(rep_tid)) {
+    // oh, good.
+    repop_ack(rep_gather[rep_tid], 
+             r->get_result(), r->get_commit(), 
+             fromosd, 
+             r->get_pg_complete_thru());
+    delete r;
+  } else {
+    // early ack.
+    waiting_for_repop[rep_tid].push_back(r);
+  }
+}
+
 
 
 
@@ -1449,36 +1325,35 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las
 
 /** pull - request object from a peer
  */
-void ReplicatedPG::pull(object_t oid)
+void ReplicatedPG::pull(pobject_t poid)
 {
-  assert(missing.loc.count(oid));
-  eversion_t v = missing.missing[oid];
-  int fromosd = missing.loc[oid];
+  assert(missing.loc.count(poid.oid));
+  eversion_t v = missing.missing[poid.oid];
+  int fromosd = missing.loc[poid.oid];
   
-  dout(7) << "pull " << oid
+  dout(7) << "pull " << poid
           << " v " << v 
           << " from osd" << fromosd
           << dendl;
 
   // send op
+  osdreqid_t rid;
   tid_t tid = osd->get_tid();
-  MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, tid,
-                          oid, info.pgid,
-                          osd->osdmap->get_epoch(),
-                          CEPH_OSD_OP_PULL);
-  op->set_version(v);
-  osd->messenger->send_message(op, osd->osdmap->get_inst(fromosd));
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
+                                  0, 0, 
+                                  osd->osdmap->get_epoch(), tid, v);
+  osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
   
   // take note
-  assert(objects_pulling.count(oid) == 0);
+  assert(objects_pulling.count(poid.oid) == 0);
   num_pulling++;
-  objects_pulling[oid] = v;
+  objects_pulling[poid.oid] = v;
 }
 
 
 /** push - send object to a peer
  */
-void ReplicatedPG::push(object_t oid, int peer)
+void ReplicatedPG::push(pobject_t poid, int peer)
 {
   // read data+attrs
   bufferlist bl;
@@ -1487,15 +1362,15 @@ void ReplicatedPG::push(object_t oid, int peer)
   map<string,bufferptr> attrset;
   
   ObjectStore::Transaction t;
-  t.read(oid, 0, 0, &bl);
-  t.getattr(oid, "version", &v, &vlen);
-  t.getattrs(oid, attrset);
+  t.read(poid, 0, 0, &bl);
+  t.getattr(poid, "version", &v, &vlen);
+  t.getattrs(poid, attrset);
   unsigned tr = osd->store->apply_transaction(t);
   
   assert(tr == 0);  // !!!
 
   // ok
-  dout(7) << "push " << oid << " v " << v 
+  dout(7) << "push " << poid << " v " << v 
           << " size " << bl.length()
           << " to osd" << peer
           << dendl;
@@ -1504,36 +1379,59 @@ void ReplicatedPG::push(object_t oid, int peer)
   osd->logger->inc("r_pushb", bl.length());
   
   // send
-  MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
-                          oid, info.pgid, osd->osdmap->get_epoch(), 
-                          CEPH_OSD_OP_PUSH); 
-  op->set_offset(0);
-  op->set_length(bl.length());
-  op->set_data(bl);   // note: claims bl, set length above here!
-  op->set_version(v);
-  op->set_attrset(attrset);
-  
-  osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+  osdreqid_t rid;  // useless?
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, bl.length(),
+                               osd->osdmap->get_epoch(), osd->get_tid(), v);
+  subop->set_data(bl);   // note: claims bl, set length above here!
+  subop->set_attrset(attrset);
+  osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
   
   if (is_primary()) {
-    peer_missing[peer].got(oid);
-    pushing[oid].insert(peer);
+    peer_missing[peer].got(poid.oid);
+    pushing[poid.oid].insert(peer);
   }
 }
 
+void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
+{
+  dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+  
+  int peer = reply->get_source().num();
+  pobject_t poid = reply->get_poid();
+  
+  if (pushing.count(poid.oid) &&
+      pushing[poid.oid].count(peer)) {
+    pushing[poid.oid].erase(peer);
+
+    if (peer_missing.count(peer) == 0 ||
+        peer_missing[peer].num_missing() == 0) 
+      uptodate_set.insert(peer);
+
+    if (pushing[poid.oid].empty()) {
+      dout(10) << "pushed " << poid << " to all replicas" << dendl;
+      do_peer_recovery();
+    } else {
+      dout(10) << "pushed " << poid << ", still waiting for push ack from " 
+              << pushing[poid.oid] << dendl;
+    }
+  } else {
+    dout(10) << "huh, i wasn't pushing " << poid << dendl;
+  }
+  delete reply;
+}
 
 
 /** op_pull
  * process request to pull an entire object.
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::op_pull(MOSDOp *op)
+void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
 {
-  const object_t oid = op->get_oid();
+  const pobject_t poid = op->get_poid();
   const eversion_t v = op->get_version();
   int from = op->get_source().num();
 
-  dout(7) << "op_pull " << oid << " v " << op->get_version()
+  dout(7) << "op_pull " << poid << " v " << op->get_version()
           << " from " << op->get_source()
           << dendl;
 
@@ -1542,46 +1440,46 @@ void ReplicatedPG::op_pull(MOSDOp *op)
     // primary
     assert(peer_missing.count(from));  // we had better know this, from the peering process.
 
-    if (!peer_missing[from].is_missing(oid)) {
+    if (!peer_missing[from].is_missing(poid.oid)) {
       dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << dendl;
       delete op;
       return;
     }
 
     // do we have it yet?
-    if (is_missing_object(oid)) {
-      wait_for_missing_object(oid, op);
+    if (is_missing_object(poid.oid)) {
+      wait_for_missing_object(poid.oid, op);
       return;
     }
   } else {
     // non-primary
-    if (missing.is_missing(oid)) {
-      dout(7) << "op_pull not primary, and missing " << oid << ", ignoring" << dendl;
+    if (missing.is_missing(poid.oid)) {
+      dout(7) << "op_pull not primary, and missing " << poid << ", ignoring" << dendl;
       delete op;
       return;
     }
   }
     
   // push it back!
-  push(oid, op->get_source().num());
+  push(poid, op->get_source().num());
 }
 
 
 /** op_push
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::op_push(MOSDOp *op)
+void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 {
-  object_t oid = op->get_oid();
+  pobject_t poid = op->get_poid();
   eversion_t v = op->get_version();
 
-  if (!is_missing_object(oid)) {
-    dout(7) << "op_push not missing " << oid << dendl;
+  if (!is_missing_object(poid.oid)) {
+    dout(7) << "sub_op_push not missing " << poid << dendl;
     return;
   }
   
   dout(7) << "op_push " 
-          << oid 
+          << poid 
           << " v " << v 
           << " size " << op->get_length() << " " << op->get_data().length()
           << dendl;
@@ -1590,16 +1488,16 @@ void ReplicatedPG::op_push(MOSDOp *op)
   
   // write object and add it to the PG
   ObjectStore::Transaction t;
-  t.remove(oid);  // in case old version exists
-  t.write(oid, 0, op->get_length(), op->get_data());
-  t.setattrs(oid, op->get_attrset());
-  t.collection_add(info.pgid, oid);
+  t.remove(poid);  // in case old version exists
+  t.write(poid, 0, op->get_length(), op->get_data());
+  t.setattrs(poid, op->get_attrset());
+  t.collection_add(info.pgid, poid);
 
   // close out pull op?
   num_pulling--;
-  if (objects_pulling.count(oid))
-    objects_pulling.erase(oid);
-  missing.got(oid, v);
+  if (objects_pulling.count(poid.oid))
+    objects_pulling.erase(poid.oid);
+  missing.got(poid.oid, v);
 
 
   // raise last_complete?
@@ -1625,15 +1523,15 @@ void ReplicatedPG::op_push(MOSDOp *op)
     for (unsigned i=1; i<acting.size(); i++) {
       int peer = acting[i];
       assert(peer_missing.count(peer));
-      if (peer_missing[peer].is_missing(oid)) 
-       push(oid, peer);  // ok, push it, and they (will) have it now.
+      if (peer_missing[peer].is_missing(poid.oid)) 
+       push(poid, peer);  // ok, push it, and they (will) have it now.
     }
   }
 
   // kick waiters
-  if (waiting_for_missing_object.count(oid)) {
-    osd->take_waiters(waiting_for_missing_object[oid]);
-    waiting_for_missing_object.erase(oid);
+  if (waiting_for_missing_object.count(poid.oid)) {
+    osd->take_waiters(waiting_for_missing_object[poid.oid]);
+    waiting_for_missing_object.erase(poid.oid);
   }
 
   if (is_primary()) {
@@ -1641,7 +1539,7 @@ void ReplicatedPG::op_push(MOSDOp *op)
     do_recovery();
   } else {
     // ack if i'm a replica and being pushed to.
-    MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
+    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false); 
     osd->messenger->send_message(reply, op->get_source_inst());
   }
 
@@ -1650,8 +1548,9 @@ void ReplicatedPG::op_push(MOSDOp *op)
 
 
 
-
-
+/*
+ * pg status change notification
+ */
 
 void ReplicatedPG::on_osd_failure(int o)
 {
@@ -1672,7 +1571,6 @@ void ReplicatedPG::on_osd_failure(int o)
     repop_ack(*p, -1, true, o);
 }
 
-
 void ReplicatedPG::on_acker_change()
 {
   dout(10) << "on_acker_change" << dendl;
@@ -1682,43 +1580,15 @@ void ReplicatedPG::on_change()
 {
   dout(10) << "on_change" << dendl;
 
-  if (g_conf.osd_rep == OSD_REP_PRIMARY ||
-      g_conf.osd_rep == OSD_REP_SPLAY) {
-    // apply all local repops
-    //  (pg is inactive; we will repeer)
-    for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
-        p != rep_gather.end();
-        p++) 
-      if (!p->second->applied)
-       apply_repop(p->second);
-  }
-  else if (g_conf.osd_rep == OSD_REP_CHAIN) {
-    // apply all local repops
-    //  (pg is inactive; we will repeer)
-    //  note: because we hose rep_gather, clients must resubmit ops on ANY pg membership change.
-    for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
-        p != rep_gather.end();
-        p++) {
-      if (!p->second->applied)
-       apply_repop(p->second);
-      delete p->second->op;
-      delete p->second;
-    }
-    rep_gather.clear();
-    
-    // and discard repop waiters (chain/splay artifact)
-    for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
-        p != waiting_for_repop.end();
-        p++)
-      for (list<Message*>::iterator pm = p->second.begin();
-          pm != p->second.end();
-          pm++)
-       delete *pm;
-    waiting_for_repop.clear();
-  }
+  // apply all local repops
+  //  (pg is inactive; we will repeer)
+  for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+       p != rep_gather.end();
+       p++) 
+    if (!p->second->applied)
+      apply_repop(p->second);
 }
 
-
 void ReplicatedPG::on_role_change()
 {
   dout(10) << "on_role_change" << dendl;
@@ -1737,81 +1607,6 @@ void ReplicatedPG::on_role_change()
 
 
 
-
-
-/** clean_up_local
- * remove any objects that we're storing but shouldn't.
- * as determined by log.
- */
-void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
-{
-  dout(10) << "clean_up_local" << dendl;
-
-  assert(info.last_update >= log.bottom);  // otherwise we need some help!
-
-  if (log.backlog) {
-
-    // FIXME: sloppy pobject vs object conversions abound!  ***
-    
-    // be thorough.
-    list<pobject_t> ls;
-    osd->store->collection_list(info.pgid, ls);
-    set<object_t> s;
-    
-    for (list<pobject_t>::iterator i = ls.begin();
-         i != ls.end();
-         i++) 
-      s.insert(i->oid); 
-
-    set<object_t> did;
-    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
-         p != log.log.rend();
-         p++) {
-      if (did.count(p->oid)) continue;
-      did.insert(p->oid);
-      
-      if (p->is_delete()) {
-        if (s.count(p->oid)) {
-          dout(10) << " deleting " << p->oid
-                   << " when " << p->version << dendl;
-          t.remove(p->oid);
-        }
-        s.erase(p->oid);
-      } else {
-        // just leave old objects.. they're missing or whatever
-        s.erase(p->oid);
-      }
-    }
-
-    for (set<object_t>::iterator i = s.begin(); 
-         i != s.end();
-         i++) {
-      dout(10) << " deleting stray " << *i << dendl;
-      t.remove(*i);
-    }
-
-  } else {
-    // just scan the log.
-    set<object_t> did;
-    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
-         p != log.log.rend();
-         p++) {
-      if (did.count(p->oid)) continue;
-      did.insert(p->oid);
-
-      if (p->is_delete()) {
-        dout(10) << " deleting " << p->oid
-                 << " when " << p->version << dendl;
-        t.remove(p->oid);
-      } else {
-        // keep old(+missing) objects, just for kicks.
-      }
-    }
-  }
-}
-
-
-
 void ReplicatedPG::cancel_recovery()
 {
   // forget about where missing items are, or anything we're pulling
@@ -1943,34 +1738,6 @@ void ReplicatedPG::do_peer_recovery()
     finish_recovery();
 }
 
-void ReplicatedPG::op_push_reply(MOSDOpReply *reply)
-{
-  dout(10) << "op_push_reply from " << reply->get_source() << " " << *reply << dendl;
-  
-  int peer = reply->get_source().num();
-  object_t oid = reply->get_oid();
-  
-  if (pushing.count(oid) &&
-      pushing[oid].count(peer)) {
-    pushing[oid].erase(peer);
-
-    if (peer_missing.count(peer) == 0 ||
-        peer_missing[peer].num_missing() == 0) 
-      uptodate_set.insert(peer);
-
-    if (pushing[oid].empty()) {
-      dout(10) << "pushed " << oid << " to all replicas" << dendl;
-      do_peer_recovery();
-    } else {
-      dout(10) << "pushed " << oid << ", still waiting for push ack from " 
-              << pushing[oid] << dendl;
-    }
-  } else {
-    dout(10) << "huh, i wasn't pushing " << oid << dendl;
-  }
-  delete reply;
-}
-
 void ReplicatedPG::purge_strays()
 {
   dout(10) << "purge_strays " << stray_set << dendl;
@@ -1988,3 +1755,75 @@ void ReplicatedPG::purge_strays()
   stray_set.clear();
 }
 
+
+
+/** clean_up_local
+ * remove any objects that we're storing but shouldn't.
+ * as determined by log.
+ */
+void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
+{
+  dout(10) << "clean_up_local" << dendl;
+
+  assert(info.last_update >= log.bottom);  // otherwise we need some help!
+
+  if (log.backlog) {
+
+    // FIXME: sloppy pobject vs object conversions abound!  ***
+    
+    // be thorough.
+    list<pobject_t> ls;
+    osd->store->collection_list(info.pgid, ls);
+    set<object_t> s;
+    
+    for (list<pobject_t>::iterator i = ls.begin();
+         i != ls.end();
+         i++) 
+      s.insert(i->oid); 
+
+    set<object_t> did;
+    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+         p != log.log.rend();
+         p++) {
+      if (did.count(p->oid)) continue;
+      did.insert(p->oid);
+      
+      if (p->is_delete()) {
+        if (s.count(p->oid)) {
+          dout(10) << " deleting " << p->oid
+                   << " when " << p->version << dendl;
+          t.remove(p->oid);
+        }
+        s.erase(p->oid);
+      } else {
+        // just leave old objects.. they're missing or whatever
+        s.erase(p->oid);
+      }
+    }
+
+    for (set<object_t>::iterator i = s.begin(); 
+         i != s.end();
+         i++) {
+      dout(10) << " deleting stray " << *i << dendl;
+      t.remove(*i);
+    }
+
+  } else {
+    // just scan the log.
+    set<object_t> did;
+    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+         p != log.log.rend();
+         p++) {
+      if (did.count(p->oid)) continue;
+      did.insert(p->oid);
+
+      if (p->is_delete()) {
+        dout(10) << " deleting " << p->oid
+                 << " when " << p->version << dendl;
+        t.remove(p->oid);
+      } else {
+        // keep old(+missing) objects, just for kicks.
+      }
+    }
+  }
+}
index 21753debe0922766300da64f19c2a156ed8da72e..16aa7ea4a091559079f2dbc86811ea99b7517a62 100644 (file)
@@ -18,7 +18,8 @@
 #include "PG.h"
 
 #include "messages/MOSDOp.h"
-
+class MOSDSubOp;
+class MOSDSubOpReply;
 
 class ReplicatedPG : public PG {
 public:  
@@ -80,8 +81,8 @@ protected:
   void get_rep_gather(RepGather*);
   void apply_repop(RepGather *repop);
   void put_rep_gather(RepGather*);
-  void issue_repop(MOSDOp *op, int osd, utime_t now);
-  RepGather *new_rep_gather(MOSDOp *op);
+  void issue_repop(RepGather *repop, int dest, utime_t now);
+  RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv);
   void repop_ack(RepGather *repop,
                  int result, bool commit,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
@@ -90,21 +91,22 @@ protected:
   int num_pulling;
   map<object_t, set<int> > pushing;
 
-  void push(object_t oid, int dest);
-  void pull(object_t oid);
+  void push(pobject_t oid, int dest);
+  void pull(pobject_t oid);
 
   // modify
   objectrev_t assign_version(MOSDOp *op);
   void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru);
-  void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete);
+  void sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
 
   void prepare_log_transaction(ObjectStore::Transaction& t, 
-                              MOSDOp *op, eversion_t& version, 
+                              osdreqid_t reqid, pobject_t poid, int op, eversion_t version,
                               objectrev_t crev, objectrev_t rev,
                               eversion_t trim_to);
-  void prepare_op_transaction(ObjectStore::Transaction& t, 
-                             MOSDOp *op, eversion_t& version, 
-                             objectrev_t crev, objectrev_t rev);
+  void prepare_op_transaction(ObjectStore::Transaction& t, const osdreqid_t& reqid,
+                             pg_t pgid, int op, pobject_t poid, 
+                             off_t offset, off_t length, bufferlist& bl,
+                             eversion_t& version, objectrev_t crev, objectrev_t rev);
 
   friend class C_OSD_ModifyCommit;
   friend class C_OSD_RepModifyCommit;
@@ -122,11 +124,12 @@ protected:
 
   void op_read(MOSDOp *op);
   void op_modify(MOSDOp *op);
-  void op_rep_modify(MOSDOp *op);
-  void op_push(MOSDOp *op);
-  void op_pull(MOSDOp *op);
 
-  void op_push_reply(MOSDOpReply *reply);
+  void sub_op_modify(MOSDSubOp *op);
+  void sub_op_modify_reply(MOSDSubOpReply *reply);
+  void sub_op_push(MOSDSubOp *op);
+  void sub_op_push_reply(MOSDSubOpReply *reply);
+  void sub_op_pull(MOSDSubOp *op);
 
 
 public:
@@ -138,14 +141,15 @@ public:
 
   bool preprocess_op(MOSDOp *op, utime_t now);
   void do_op(MOSDOp *op);
-  void do_op_reply(MOSDOpReply *r);
+  void do_sub_op(MOSDSubOp *op);
+  void do_sub_op_reply(MOSDSubOpReply *op);
 
   bool same_for_read_since(epoch_t e);
   bool same_for_modify_since(epoch_t e);
   bool same_for_rep_modify_since(epoch_t e);
 
   bool is_missing_object(object_t oid);
-  void wait_for_missing_object(object_t oid, MOSDOp *op);
+  void wait_for_missing_object(object_t oid, Message *op);
 
   void on_osd_failure(int o);
   void on_acker_change();