]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
newrepop merge
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 10 Sep 2006 03:43:27 +0000 (03:43 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 10 Sep 2006 03:43:27 +0000 (03:43 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@850 29311d96-e01e-0410-9327-a35deaab8ce9

24 files changed:
ceph/Makefile
ceph/TODO
ceph/common/Clock.h
ceph/common/Cond.h
ceph/config.cc
ceph/config.h
ceph/ebofs/Ebofs.cc
ceph/jobs/rados/map_dist
ceph/jobs/rados/rep_lat [new file with mode: 0644]
ceph/messages/MOSDOp.h
ceph/messages/MOSDOpReply.h
ceph/msg/Messenger.h
ceph/msg/NewMessenger.cc
ceph/msg/NewMessenger.h
ceph/msg/new_mpistarter.cc
ceph/newsyn.cc
ceph/osd/Ager.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 4ad5a92f092a03fda630140af02146d1e77a26ac..e130573e6926012318f270a2df825741227c21b9 100644 (file)
@@ -120,10 +120,7 @@ mpifuse: mpifuse.cc mds.o client.o client/fuse.o ${TCP_OBJS} common.o
 
 
 # synthetic workload
-fakemon: fakemon.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
-       ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
-
-fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
 tcpsyn: tcpsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o ${TCP_OBJS} common.o
index 228e153da0f25a3a7084e0889f585922f16a6f4f..5eda56c1ee32a856db0a2131c3d95a5c8f36f566 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,121 +1,4 @@
 
-== FAST rados paper
-
-todo
-
-
-
-1- finish ebofs fragmetnation experiment
-
- -> do rd_age for the 200gb partition.  
-(hopefully, we can show that our allocation strategy helps reads at the expense of writes)
-
-2- age ext2 overnight to do a wishy-washy comparison
-
-3- better map_dist plot.
-
-
-
-
-1- get ebofs frag experiment running.
-
-/2- figure out rep_lat issue?
-/ 19 - perfect for 1mb writes
-
-3- ebofs time frag plot
-
-4- map dist storm.
-
-
-
-EASY
-- ebofs tput vs frag
-
-HARD
-- aged fakestore
-  - vs wrsizes
-
-- ebofs frag versus time plot
-
-
-
-
-
-cluster map
-- map distribution
-
-data distribution
- placement groups
-
-replication operation
-
-failure detection
- osd monitor
- up/down/in/out
-
-pg logging
- top, bottom, trimming, etc.
- caller stamps -> idempotent operations
-
-pg recovery
- wrt logs, objects
- wrt active ops
-
-
-ebofs
- on disk layout
- data safety
- storage interface
-
-
--- rados applications 
-- files
-- b-link trees
-- distributed hash table
-- fifo queues
-
-
-
-
-== rados client nodes
-
-why do we want client op ordering?
-- simpler logic in objectcacher
-- can pipeline lock + write + unlock, etc.
-  - (at least, wrt to single objects..)
-- all complexity is in Objecter
-  - barrier: waitfor outstanding acks whenever primary osd shifts.
-
-why don't we need it?
-- can make compound write+unlock...
-- don't actually need ordering for concurrent client ops..
-
----> we want it.
-
-
-* ObjectCacher currently assumes reads+acks+commits over an _object_ are ordered.
- -> they are.
-
-- primary changes by pg
-- actual discontinuity is over disks
-- but the ordering is over objects .. not pgs (bc of missing objects, recovery, etc.)
-
-Objecter:
-- should order acks over objects, because we want to order the updates (see below)
-
-- on map update and primary change,
- - resubmit to new primary.
-
-- only accept acks from current primary.
-- only accept commits from current primary.
-- need pg map
-  - to detect primary changes,
-  - pg crashes
-
-
-
-
 == todo
 
 - how to get usage feedback to monitor?
@@ -125,11 +8,18 @@ messenger
 - lookup upcall,
 - distributed namer (eg send to MSG_ADDR_MON_ANY)
 - failure reporting
+- generalize out a transport layer?  
+  - eg reliable tcp for most things, connectionless unreliable datagrams for monitors?
+  - or, aggressive connection closing on monitors?  or just max_connections and an lru?
 - share same tcp socket for sender and receiver?
 - close idle connections?
 - osds: forget idle client addrs
 
 osd/rados
+- clean up writeahead logs
+- fix heartbeat wrt new replication
+- figure out new rep failure cases
+- same_primary_since -> same_tail_since
 - mark residual pgs obsolete  ***
 - deal with divergent disconnected primaries
 - rdlocks
@@ -143,6 +33,7 @@ monitor
 - watch osd utilization; adjust overload in cluster map
 
 objecter
+- handle new rep mode failure modes...  head, tail, middle
 
 objectcacher
 - ocacher flushing
index 4ee2fcd6fe5a2e43dc537bd5d8c561fa7b41d95e..2b369c71098dbe78520afa1188e1f23f12f58929 100644 (file)
@@ -132,14 +132,18 @@ class Clock {
   // relative time (from startup)
   const utime_t& now() {
        gettimeofday(&last.timeval(), NULL);
-       //last -= zero;
+       last -= zero;
        //last = abs_last - start_offset;
        return last;
   }
-  const utime_t real_now() {
-       utime_t now;
-       gettimeofday(&now.timeval(), NULL);
-       return now;
+
+  void realify(utime_t& t) {
+       t += zero;
+  }
+  utime_t real_now() {
+       utime_t realnow;
+       gettimeofday(&realnow.timeval(), NULL);
+       return realnow;
   }
 
   const utime_t& recent_now() {
index d61a0af16a6f8d6153777c706c15174d27d13996..98cb7f9e009c46fcafd916bdcc1c28c21b642cd4 100644 (file)
@@ -61,6 +61,9 @@ class Cond
   }
 
   int WaitUntil(Mutex &mutex, utime_t when) {
+       // make sure it's _real_ time
+       g_clock.realify(when);
+
        // timeval -> timespec
        struct timespec ts;
        memset(&ts, 0, sizeof(ts));
@@ -71,7 +74,7 @@ class Cond
        return r;
   }
   int WaitInterval(Mutex &mutex, utime_t interval) {
-       utime_t when = g_clock.real_now();
+       utime_t when = g_clock.now();
        when += interval;
        return WaitUntil(mutex, when);
   }
index b243181b622beb9c5112ef3d322aadf0cc8b89db..3e5368b79432f36a8f017374b1d9191a65636ea5 100644 (file)
@@ -179,6 +179,7 @@ md_config_t g_conf = {
 
 
   // --- osd ---
+  osd_rep: OSD_REP_PRIMARY,
   osd_pg_bits: 8,
   osd_object_layout: OBJECT_LAYOUT_HASHINO,
   osd_pg_layout: PG_LAYOUT_CRUSH,
@@ -587,6 +588,15 @@ void parse_config_options(vector<char*>& args)
          g_conf.osd_maxthreads = 1;   // until feng merges joel's fixes
        }
 
+       
+       else if (strcmp(args[i], "--osd_rep") == 0) 
+         g_conf.osd_rep = atoi(args[++i]);
+       else if (strcmp(args[i], "--osd_rep_chain") == 0) 
+         g_conf.osd_rep = OSD_REP_CHAIN;
+       else if (strcmp(args[i], "--osd_rep_splay") == 0) 
+         g_conf.osd_rep = OSD_REP_SPLAY;
+       else if (strcmp(args[i], "--osd_rep_primary") == 0) 
+         g_conf.osd_rep = OSD_REP_PRIMARY;
        else if (strcmp(args[i], "--osd_mkfs") == 0) 
          g_conf.osd_mkfs = atoi(args[++i]);
        else if (strcmp(args[i], "--osd_age") == 0) 
index 5b8d53c41f599fb203c47069e3258f4a0183be35..ebd0dfdd49b0f3c3a9292c61f702abfc76e12933 100644 (file)
@@ -12,6 +12,9 @@ using namespace std;
 extern map<int,float> g_fake_osd_down;
 extern map<int,float> g_fake_osd_out;
 
+#define OSD_REP_PRIMARY 0
+#define OSD_REP_SPLAY   1
+#define OSD_REP_CHAIN   2
 
 struct md_config_t {
   int  num_mon;
@@ -151,6 +154,7 @@ struct md_config_t {
 
 
   // osd
+  int   osd_rep;
   int   osd_pg_bits;
   int   osd_object_layout;
   int   osd_pg_layout;
index fd15e1912452ed9debeb04d4f1bd966434f3c8ad..9aee73f74d312f7dee8f6de8d1afb467ef14bff3 100644 (file)
@@ -1227,8 +1227,9 @@ int Ebofs::statfs(struct statfs *buf)
   buf->f_type = EBOFS_MAGIC;             /* type of filesystem */
   buf->f_bsize = 4096;                   /* optimal transfer block size */
   buf->f_blocks = dev.get_num_blocks();  /* total data blocks in file system */
-  buf->f_bfree = get_free_blocks() + get_limbo_blocks();            /* free blocks in fs */
-  buf->f_bavail = get_free_blocks();// + get_limbo_blocks();           /* free blocks avail to non-superuser */
+  buf->f_bfree = get_free_blocks() 
+       + get_limbo_blocks();                /* free blocks in fs */
+  buf->f_bavail = get_free_blocks();     /* free blocks avail to non-superuser -- actually, for writing. */
   buf->f_files = nodepool.num_total();   /* total file nodes in file system */
   buf->f_ffree = nodepool.num_free();    /* free file nodes in fs */
   //buf->f_fsid = 0;                       /* file system id */
index 02173f5767baa63caa488b015d104c4993c403e4..39f16daa1cdc273d8d74caa8ff8ff2cb2ec034fc 100644 (file)
@@ -4,7 +4,7 @@
 {
        'sleep' => 3,
 
-       'osdbits' => [6,7,8,9,10],#,9],10,11],
+       'osdbits' => [6,7,8],#,9],10,11],
        'pgperbits' => [3],#,4,5],#[4,6,8],
 
        'nummds' => 1,
diff --git a/ceph/jobs/rados/rep_lat b/ceph/jobs/rados/rep_lat
new file mode 100644 (file)
index 0000000..3f5ab0c
--- /dev/null
@@ -0,0 +1,43 @@
+#!/usr/bin/perl
+
+# hi there
+{
+       'sleep' => 3,
+
+       'nummds' => 1,
+       'numosd' => 8, #[6],
+       'numclient' => 1,#, 40, 80, 160 ],
+       'n' => 10,
+
+       'fs' => 'ebofs',
+
+       'start' => 10,
+       'end' => 40,
+       'until' => 40,  
+       'kill_after' => 45,
+       
+       'writefile' => 1,
+       'writefile_size' => [4096, 
+#                                               8*1024,
+#                                               16*1024, 
+#                                               32*1024,
+                                                64*1024,
+#                                               128*1024,
+#                                               256*1024,
+#                                               512*1024,
+#                                               1024*1024
+],
+       'writefile_mb' => 10000,
+
+       'osd_rep' => [0,1,2],
+
+       'file_layout_num_rep' => [1,2,3,4,5,6],#, 2, 3, 4],
+
+       'osd_pg_bits' => 4,
+       'custom' => '--osd_max_rep 8',
+
+       'comb' => {
+               'x' => 'file_layout_num_rep',
+               'vars' => [ 'cl.wrlat' ]
+       }
+};
index 8d73570c68289611995c37eaad8899361a9976dd..0f730f577e28035600cdda952d8b21cb7584a755 100644 (file)
@@ -17,8 +17,6 @@
 
 #include "msg/Message.h"
 
-#include "osd/OSDMap.h"
-
 /*
  * OSD op
  *
 #define OSD_OP_UPLOCK     24
 #define OSD_OP_DNLOCK     25
 
+#define OSD_OP_PULL       30
+#define OSD_OP_PUSH       31
 
-#define OSD_OP_IS_REP(x)  ((x) >= 30)
-
-// replication/recovery -- these ops are relative to a specific object version #
-#define OSD_OP_REP_WRITE    (100+OSD_OP_WRITE)     // replicated (partial object) write
-#define OSD_OP_REP_WRNOOP   (100+OSD_OP_WRNOOP)     // replicated (partial object) write
-#define OSD_OP_REP_TRUNCATE (100+OSD_OP_TRUNCATE)  // replicated truncate
-#define OSD_OP_REP_DELETE   (100+OSD_OP_DELETE)
-#define OSD_OP_REP_WRLOCK   (100+OSD_OP_WRLOCK)
-#define OSD_OP_REP_WRUNLOCK (100+OSD_OP_WRUNLOCK)
-#define OSD_OP_REP_RDLOCK   (100+OSD_OP_RDLOCK)
-#define OSD_OP_REP_RDUNLOCK (100+OSD_OP_RDUNLOCK)
-#define OSD_OP_REP_UPLOCK   (100+OSD_OP_UPLOCK)
-#define OSD_OP_REP_DNLOCK   (100+OSD_OP_DNLOCK)
-
-#define OSD_OP_REP_PULL     30   // whole object read
-//#define OSD_OP_REP_PUSH     31   // whole object write
-//#define OSD_OP_REP_REMOVE   32   // delete replica
-
-//#define OSD_OP_FLAG_TRUNCATE  1   // truncate object after end of write
 
 typedef struct {
-  tid_t tid;
   long pcid;
-  msg_addr_t asker;
 
-  tid_t      orig_tid;
-  msg_addr_t orig_asker;
+  // who's asking?
+  tid_t tid;
+  msg_addr_t client;
+
+  // for replication
+  tid_t rep_tid;
 
   object_t oid;
   pg_t pg;
-  int        pg_role;//, rg_nrep;
+
   epoch_t map_epoch;
 
   eversion_t pg_trim_to;   // primary->replica: trim to here
@@ -90,8 +73,7 @@ typedef struct {
   bool   want_ack;
   bool   want_commit;
 
-  //epoch_t _included_map_epoch;
-  size_t _data_len;//, _osdmap_len;
+  size_t _data_len;
 
 } MOSDOp_st;
 
@@ -99,6 +81,10 @@ class MOSDOp : public Message {
 public:
   static const char* get_opname(int op) {
        switch (op) {
+       case OSD_OP_READ: return "read";
+       case OSD_OP_STAT: return "stat";
+
+       case OSD_OP_WRNOOP: return "wrnoop"; 
        case OSD_OP_WRITE: return "write"; 
        case OSD_OP_ZERO: return "zero"; 
        case OSD_OP_DELETE: return "delete"; 
@@ -109,7 +95,9 @@ public:
        case OSD_OP_RDUNLOCK: return "rdunlock"; 
        case OSD_OP_UPLOCK: return "uplock"; 
        case OSD_OP_DNLOCK: return "dnlock"; 
-       case OSD_OP_WRNOOP: return "wrnoop"; 
+
+       case OSD_OP_PULL: return "pull";
+       case OSD_OP_PUSH: return "push";
        default: assert(0);
        }
        return 0;
@@ -118,27 +106,24 @@ public:
 private:
   MOSDOp_st st;
   bufferlist data;
-  //bufferlist osdmap;
+  map<string,bufferptr> attrset;
 
   friend class MOSDOpReply;
 
  public:
-
   const tid_t       get_tid() { return st.tid; }
-  const msg_addr_t& get_asker() { return st.asker; }
+  const msg_addr_t& get_client() { return st.client; }
 
-  const tid_t       get_orig_tid() { return st.orig_tid; }
-  const msg_addr_t& get_orig_asker() { return st.orig_asker; }
-  void set_orig_tid(tid_t t) { st.orig_tid = t; }
-  void set_orig_asker(const msg_addr_t& a) { st.orig_asker = a; }
+  const tid_t       get_rep_tid() { return st.rep_tid; }
+  void set_rep_tid(tid_t t) { st.rep_tid = t; }
 
   const object_t   get_oid() { return st.oid; }
   const pg_t get_pg() { return st.pg; }
   const epoch_t  get_map_epoch() { return st.map_epoch; }
 
-  const int        get_pg_role() { return st.pg_role; }  // who am i asking for?
+  //const int        get_pg_role() { return st.pg_role; }  // who am i asking for?
   const eversion_t  get_version() { return st.version; }
-  const eversion_t  get_old_version() { return st.old_version; }
+  //const eversion_t  get_old_version() { return st.old_version; }
 
   const eversion_t get_pg_trim_to() { return st.pg_trim_to; }
   void set_pg_trim_to(eversion_t v) { st.pg_trim_to = v; }
@@ -149,6 +134,9 @@ private:
   const size_t get_length() { return st.length; }
   const size_t get_offset() { return st.offset; }
 
+  map<string,bufferptr>& get_attrset() { return attrset; }
+  void set_attrset(map<string,bufferptr> &as) { attrset = as; }
+
   const bool wants_ack() { return st.want_ack; }
   const bool wants_commit() { return st.want_commit; }
 
@@ -161,17 +149,6 @@ private:
   }
   size_t get_data_len() { return st._data_len; }
 
-  /*void attach_map(OSDMap *o) {
-       o->encode(osdmap);
-       st._included_map_epoch = o->get_epoch();
-  }
-  epoch_t get_osdmap_epoch() {
-       return st._included_map_epoch;
-  }
-  bufferlist& get_osdmap_bl() { 
-       return osdmap;
-       }*/
-
 
   // keep a pcid (procedure call id) to match up request+reply
   void set_pcid(long pcid) { this->st.pcid = pcid; }
@@ -181,12 +158,12 @@ private:
                 object_t oid, pg_t pg, epoch_t mapepoch, int op) :
        Message(MSG_OSD_OP) {
        memset(&st, 0, sizeof(st));
-       this->st.orig_tid = this->st.tid = tid;
-       this->st.orig_asker = this->st.asker = asker;
+       this->st.client = asker;
+       this->st.tid = tid;
+       this->st.rep_tid = 0;
 
        this->st.oid = oid;
        this->st.pg = pg;
-       this->st.pg_role = 0;
        this->st.map_epoch = mapepoch;
        this->st.op = op;
 
@@ -195,7 +172,7 @@ private:
   }
   MOSDOp() {}
 
-  void set_pg_role(int r) { st.pg_role = r; }
+  //void set_pg_role(int r) { st.pg_role = r; }
   //void set_rg_nrep(int n) { st.rg_nrep = n; }
 
   void set_length(size_t l) { st.length = l; }
@@ -208,17 +185,18 @@ private:
 
   // marshalling
   virtual void decode_payload() {
-       payload.copy(0, sizeof(st), (char*)&st);
-       payload.splice(0, sizeof(st));
-       if (st._data_len) payload.splice(0, st._data_len, &data);
-       //if (st._osdmap_len) payload.splice(0, st._osdmap_len, &osdmap);
+       int off = 0;
+       payload.copy(off, sizeof(st), (char*)&st);
+       off += sizeof(st);
+       ::_decode(attrset, payload, off);
+       if (st._data_len) 
+         payload.splice(off, st._data_len, &data);
   }
   virtual void encode_payload() {
        st._data_len = data.length();
-       //st._osdmap_len = osdmap.length();
        payload.push_back( new buffer((char*)&st, sizeof(st)) );
+       ::_encode(attrset, payload);
        payload.claim_append( data );
-       //payload.claim_append( osdmap );
   }
 
   virtual char *get_type_name() { return "oop"; }
@@ -226,8 +204,8 @@ private:
 
 inline ostream& operator<<(ostream& out, MOSDOp& op)
 {
-  return out << "MOSDOp(" << MSG_ADDR_NICE(op.get_asker()) << "." << op.get_tid() 
-                        << " op " << op.get_op()
+  return out << "MOSDOp(" << MSG_ADDR_NICE(op.get_client()) << "." << op.get_tid() 
+                        << " op " << MOSDOp::get_opname(op.get_op())
                         << " oid " << hex << op.get_oid() << dec << " " << &op << ")";
 }
 
index 397b450339a7bcd01d4653bfe50627d4fcb780ad..e2f4464f5b2733488a7755db64ef4c33fb873110 100644 (file)
@@ -16,7 +16,6 @@
 #define __MOSDOPREPLY_H
 
 #include "msg/Message.h"
-#include "osd/OSDMap.h"
 
 #include "MOSDOp.h"
 #include "osd/ObjectStore.h"
@@ -32,8 +31,9 @@
 
 typedef struct {
   // req
-  long tid;
   long pcid;
+  tid_t tid;
+  tid_t rep_tid;
 
   object_t oid;
   pg_t pg;
@@ -58,11 +58,11 @@ typedef struct {
 class MOSDOpReply : public Message {
   MOSDOpReply_st st;
   bufferlist data;
-  bufferlist osdmap;
   map<string,bufferptr> attrset;
 
  public:
   long     get_tid() { return st.tid; }
+  long     get_rep_tid() { return st.rep_tid; }
   object_t get_oid() { return st.oid; }
   pg_t     get_pg() { return st.pg; }
   int      get_op()  { return st.op; }
@@ -85,6 +85,10 @@ class MOSDOpReply : public Message {
   void set_version(eversion_t v) { st.version = v; }
   void set_attrset(map<string,bufferptr> &as) { attrset = as; }
 
+  void set_op(int op) { st.op = op; }
+  void set_tid(tid_t t) { st.tid = t; }
+  void set_rep_tid(tid_t t) { st.rep_tid = t; }
+
   // data payload
   void set_data(bufferlist &d) {
        data.claim(d);
@@ -101,15 +105,18 @@ class MOSDOpReply : public Message {
   void set_pcid(long pcid) { this->st.pcid = pcid; }
   long get_pcid()          { return st.pcid; }
 
-  MOSDOpReply(MOSDOp *req, int result, epoch_t e, /*OSDMap *oc, */bool commit) :
+public:
+  MOSDOpReply(MOSDOp *req, int result, epoch_t e, bool commit) :
        Message(MSG_OSD_OPREPLY) {
        memset(&st, 0, sizeof(st));
        this->st.pcid = req->st.pcid;
+
+       this->st.op = req->st.op;
        this->st.tid = req->st.tid;
+       this->st.rep_tid = req->st.rep_tid;
 
        this->st.oid = req->st.oid;
        this->st.pg = req->st.pg;
-       this->st.op = req->st.op;
        this->st.result = result;
        this->st.commit = commit;
 
@@ -118,16 +125,6 @@ class MOSDOpReply : public Message {
        this->st.version = req->st.version;
 
        this->st.map_epoch = e;
-
-       // attach updated cluster spec?
-       /*
-       if (oc &&
-               req->get_map_epoch() < oc->get_epoch()) {
-         oc->encode(osdmap);
-         st._new_map_epoch = oc->get_epoch();
-         st._oc_len = osdmap.length();
-       }
-       */
   }
   MOSDOpReply() {}
 
index 6e1110281ad7643e0ad51358cfbae9443a7ec376..d7bd4ea7c57b3eedf18f16ab57ac2dd60945b79b 100644 (file)
@@ -84,6 +84,7 @@ class Messenger {
   virtual void dispatch(Message *m);
 
   // send message
+  virtual void prepare_send_message(msg_addr_t dest) {}
   virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
 
   // make a procedure call
index a75e8dee03838f0cb43209cd77dccbd900a8e41f..1010886fb36cbb8e8e63ecfbc216a5d0675fb4c0 100644 (file)
@@ -23,8 +23,8 @@
 
 
 #undef dout
-#define dout(l)  if (l<=g_conf.debug_ms) cout << "-- rank" << rank.my_rank << " "
-#define derr(l)  if (l<=g_conf.debug_ms) cerr << "-- rank" << rank.my_rank << " "
+#define dout(l)  if (l<=g_conf.debug_ms) cout << g_clock.now() << " -- rank" << rank.my_rank << " "
+#define derr(l)  if (l<=g_conf.debug_ms) cerr << g_clock.now() << " -- rank" << rank.my_rank << " "
 
 
 
@@ -1055,6 +1055,43 @@ void Rank::submit_messages(list<Message*>& ls)
   ls.clear();
 }
 
+
+void Rank::prepare_dest(msg_addr_t dest)
+{
+  lock.Lock();
+
+  if (entity_map.count( dest )) {
+       // remote, known rank addr.
+       entity_inst_t inst = entity_map[dest];
+       
+       if (inst == my_inst) {
+         //dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl;
+         //waiting_for_lookup[dest].push_back(m);
+       }
+       else if (rank_sender.count( inst.rank ) &&
+                        rank_sender[inst.rank]->inst == inst) {
+         //dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << inst << ", connected." << endl;
+         // connected.
+         //sender = rank_sender[ inst.rank ];
+       } else {
+         //dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << inst << ", connecting." << endl;
+         // not connected.
+         connect_rank( inst );
+       }
+  } else {
+       // unknown dest rank or rank addr.
+       if (looking_up.count(dest) == 0) {
+         //dout(20) << "submit_message " << *m << " dest " << dest << " remote, unknown addr, looking up" << endl;
+         lookup(dest);
+       } else {
+         //dout(20) << "submit_message " << *m << " dest " << dest << " remote, unknown addr, already looking up" << endl;
+       }
+  }
+
+  lock.Unlock();
+}
+
+
 void Rank::submit_message(Message *m)
 {
   const msg_addr_t dest = m->get_dest();
@@ -1464,6 +1501,13 @@ int Rank::EntityMessenger::shutdown()
   return 0;
 }
 
+
+void Rank::EntityMessenger::prepare_send_message(msg_addr_t dest)
+{
+  rank.prepare_dest(dest);
+}
+
+
 int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
 {
   // set envelope
index 56e0fc7609b4e0dbfde720d54e2996dd7e791579..387348aafb76ee766451861fbc28d8d1ccd804d9 100644 (file)
@@ -185,6 +185,7 @@ class Rank : public Dispatcher {
        
        virtual void callback_kick() {} 
        virtual int shutdown();
+       virtual void prepare_send_message(msg_addr_t dest);
        virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
 
        virtual void mark_down(msg_addr_t a, entity_inst_t& i);
@@ -284,6 +285,7 @@ public:
   EntityMessenger *register_entity(msg_addr_t addr);
   void unregister_entity(EntityMessenger *ms);
 
+  void prepare_dest(msg_addr_t dest);
   void submit_message(Message *m);  
   void submit_messages(list<Message*>& ls);  
 
index 1476bba2094f7b1c997c6f37ab7718d546413773..5e9cd459acf2bed7314585ff09df48943c31b2f2 100644 (file)
@@ -34,6 +34,9 @@ pair<int,int> mpi_bootstrap_new(int& argc, char**& argv)
   }
 
   MPI_Barrier(MPI_COMM_WORLD);
+
+  //g_clock.tare();
+
   MPI_Finalize();
 
   return pair<int,int>(mpi_rank, mpi_world);
index f863718c07115f445fcacee7ace1a57e5013d890..33568b2f8536993407844232e1042719096b4b04 100644 (file)
@@ -281,9 +281,10 @@ int main(int argc, char **argv)
   // wait for everything to finish
   rank.wait();
 
-  if (started) cerr << "tcpsyn finishing" << endl;
+  if (started) cerr << "newsyn finishing" << endl;
+
+  return 0;  // whatever, cleanup hangs sometimes (stopping ebofs threads?).
 
-  return 0;
 
   // cleanup
   for (map<int,MDS*>::iterator i = mds.begin(); i != mds.end(); i++)
index 151104a0a43f45bc232fc7a522de772b71d9e865..4045877d22b664a048ec9cb4fd26d2101f8a975f 100644 (file)
@@ -277,7 +277,7 @@ void Ager::age(int time,
   }
 
   // dump the freelist
-  //save_freelist(0);
+  save_freelist(0);
   exit(0);   // hack
 
   // ok!
index f0540dacfd8693b891b88a5c00c9cbca2de20aab..2a6aa5d1af30cbeb7877785cea8ebaa893e24773 100644 (file)
 
 #include "config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
+#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
 
 char *osd_base_path = "./osddata";
 char *ebofs_base_path = "./ebofsdev";
 
-#define ROLE_TYPE(x)   ((x)>0 ? 1:(x))
-
 
 
 
@@ -149,6 +147,7 @@ OSD::OSD(int id, Messenger *m, char *dev)
 
   if (g_conf.ebofs) {
     store = new Ebofs(dev_path);
+       //store->_fake_writes(true);
   }
 #ifdef USE_OBFS
   else if (g_conf.uofs) {
@@ -250,8 +249,8 @@ int OSD::init()
        osd_logtype.add_inc("c_wr");
        osd_logtype.add_inc("c_wrb");
        
-       osd_logtype.add_inc("r_pull");
-       osd_logtype.add_inc("r_pullb");
+       osd_logtype.add_inc("r_push");
+       osd_logtype.add_inc("r_pushb");
        osd_logtype.add_inc("r_wr");
        osd_logtype.add_inc("r_wrb");
        
@@ -287,6 +286,8 @@ int OSD::init()
   }
   osd_lock.Unlock();
 
+  dout(0) << "osd_rep " << g_conf.osd_rep << endl;
+
   return 0;
 }
 
@@ -374,6 +375,7 @@ PG *OSD::_lock_pg(pg_t pgid)
   if (pg_lock.count(pgid)) {
        Cond c;
        dout(15) << "lock_pg " << hex << pgid << dec << " waiting as " << &c << endl;
+       //cerr << "lock_pg " << hex << pgid << dec << " waiting as " << &c << endl;
 
        list<Cond*>& ls = pg_lock_waiters[pgid];   // this is commit, right?
        ls.push_back(&c);
@@ -739,115 +741,6 @@ bool OSD::ms_lookup(msg_addr_t dest, entity_inst_t& inst)
 
 
 
-void OSD::handle_op_reply(MOSDOpReply *m)
-{
-  if (m->get_map_epoch() < boot_epoch) {
-       dout(3) << "replica op reply from before boot" << endl;
-       delete m;
-       return;
-  }
-  
-
-  // handle op
-  switch (m->get_op()) {
-  case OSD_OP_REP_PULL:
-       op_rep_pull_reply(m);
-       break;
-
-  case OSD_OP_REP_WRNOOP:
-  case OSD_OP_REP_WRITE:
-  case OSD_OP_REP_TRUNCATE:
-  case OSD_OP_REP_DELETE:
-  case OSD_OP_REP_WRLOCK:
-  case OSD_OP_REP_WRUNLOCK:
-  case OSD_OP_REP_RDLOCK:
-  case OSD_OP_REP_RDUNLOCK:
-  case OSD_OP_REP_UPLOCK:
-  case OSD_OP_REP_DNLOCK:
-       {
-         const pg_t pgid = m->get_pg();
-         if (pg_map.count(pgid)) {
-               PG *pg = _lock_pg(pgid);
-               assert(pg);
-               handle_rep_op_ack(pg, m->get_tid(), m->get_result(), m->get_commit(), MSG_ADDR_NUM(m->get_source()),
-                                                 m->get_pg_complete_thru());
-               _unlock_pg(pgid);
-         } else {
-               // pg dne!  whatev.
-         }
-         delete m;
-       }
-       break;
-
-  default:
-       assert(0);
-  }
-}
-
-/*
- * NOTE: called holding pg lock      /////osd_lock, opqueue active.
- */
-void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit, 
-                                                       int fromosd, eversion_t pg_complete_thru)
-{
-  if (!pg->replica_ops.count(tid)) {
-       dout(7) << "not waiting for repop reply tid " << tid << " in " << *pg 
-                       << ", map must have changed, dropping." << endl;
-       return;
-  }
-  
-  OSDReplicaOp *repop = pg->replica_ops[tid];
-  MOSDOp *op = repop->op;
-
-  dout(7) << "handle_rep_op_ack " << tid << " op " << op
-                 << " result " << result << " commit " << commit << " from osd" << fromosd
-                 << " in " << *pg
-                 << endl;
-
-  /* 
-   * for now, we take a lazy approach to handling replica set changes
-   * that overlap with writes.  replicas with newer maps will reply with
-   * result == -1, but we treat them as a success, and ack the write to
-   * the client.  this means somewhat weakened safety semantics for the client
-   * write, but is much simpler on the osd end.  and no weaker than the rest of the 
-   * data in the PG.. or this same write, if it had completed just before the failure.
-   *
-   * meanwhile, the regular recovery process will handle the object version
-   * mismatch.. the new primary (and others) will pull the latest from the old
-   * primary.  because of the PGLog stuff, it'll be pretty efficient, aside from
-   * the fact that the entire object is copied.
-   *
-   * one optimization: if the rep_write is received by the new primary, they can
-   * (at their discretion) apply it and remove the object from their missing list...
-   * or: if a replica sees tha the old primary is not down, it might assume that its
-   * state will be recovered (ie the new version) and apply the write.
-   */
-  if (1) {  //if (result >= 0) {
-       // success
-       get_repop(repop);
-       {
-         if (commit) {
-               // commit
-               assert(repop->waitfor_commit.count(tid));         
-               repop->waitfor_commit.erase(tid);
-               repop->waitfor_ack.erase(tid);
-
-               repop->pg_complete_thru[fromosd] = pg_complete_thru;
-               
-               pg->replica_ops.erase(tid);
-               pg->replica_tids_by_osd[fromosd].erase(tid);
-               if (pg->replica_tids_by_osd[fromosd].empty()) pg->replica_tids_by_osd.erase(fromosd);
-         } else {
-               // ack
-               repop->waitfor_ack.erase(tid);
-         }
-       }
-       put_repop(repop);
-  }
-
-}
-
-
 
 void OSD::handle_osd_ping(MOSDPing *m)
 {
@@ -983,9 +876,29 @@ void OSD::handle_osd_map(MOSDMap *m)
          for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
                   i != inc.new_down.end();
                   i++) {
-               if (i->first == whoami) continue;
-               messenger->mark_down(MSG_ADDR_OSD(i->first), i->second);
-               peer_map_epoch.erase(MSG_ADDR_OSD(i->first));
+               int osd = i->first;
+               if (osd == whoami) continue;
+               messenger->mark_down(MSG_ADDR_OSD(osd), i->second);
+               peer_map_epoch.erase(MSG_ADDR_OSD(osd));
+         
+               // kick any replica ops
+               for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
+                        it != pg_map.end();
+                        it++) {
+                 PG *pg = it->second;
+                 list<PG::RepOpGather*> ls;  // do async; repop_ack() may modify pg->repop_gather
+                 for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
+                          p != pg->repop_gather.end();
+                          p++) {
+                       if (p->second->waitfor_ack.count(osd) ||
+                               p->second->waitfor_commit.count(osd)) 
+                         ls.push_back(p->second);
+                 }
+                 for (list<PG::RepOpGather*>::iterator p = ls.begin();
+                          p != ls.end();
+                          p++)
+                       repop_ack(pg, *p, -1, true, osd);
+               }
          }
          for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
                   i != inc.new_up.end();
@@ -1058,29 +971,31 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                  << endl;
   
   if (osdmap->is_mkfs()) {
-       dout(1) << "mkfs" << endl;
+       ps_t maxps = 1ULL << osdmap->get_pg_bits();
+       dout(1) << "mkfs on " << osdmap->get_pg_bits() << " bits, " << maxps << " pgs" << endl;
        assert(osdmap->get_epoch() == 1);
 
        //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << endl;
        logger->set_start( osdmap->get_ctime() );
 
-       ps_t maxps = 1LL << osdmap->get_pg_bits();
-       
        // create PGs
        for (int nrep = 1; 
                 nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep);    // for low osd counts..  hackish bleh
                 nrep++) {
          for (pg_t ps = 0; ps < maxps; ps++) {
                pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
-               int role = osdmap->get_pg_acting_role(pgid, whoami);
+               vector<int> acting;
+               int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+               int role = osdmap->calc_pg_role(whoami, acting, nrep);
                if (role < 0) continue;
                
                PG *pg = create_pg(pgid, t);
-               osdmap->pg_to_acting_osds(pgid, pg->acting);
                pg->set_role(role);
+               pg->acting.swap(acting);
                pg->last_epoch_started_any = 
                  pg->info.last_epoch_started = 
                  pg->info.same_primary_since = 
+                 pg->info.same_acker_since = 
                  pg->info.same_role_since = osdmap->get_epoch();
                pg->activate(t);
                
@@ -1089,15 +1004,17 @@ void OSD::advance_map(ObjectStore::Transaction& t)
 
          // local PG too
          pg_t pgid = osdmap->osd_nrep_to_pg(whoami, nrep);
-         int role = osdmap->get_pg_acting_role(pgid, whoami);
-         if (role < 0) continue;
+         vector<int> acting;
+         int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+         int role = osdmap->calc_pg_role(whoami, acting, nrep);
 
          PG *pg = create_pg(pgid, t);
-         osdmap->pg_to_acting_osds(pgid, pg->acting);
+         pg->acting.swap(acting);
          pg->set_role(role);
          pg->last_epoch_started_any = 
                pg->info.last_epoch_started = 
                pg->info.same_primary_since = 
+               pg->info.same_acker_since = 
                pg->info.same_role_since = osdmap->get_epoch();
          pg->activate(t);
                
@@ -1122,33 +1039,37 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          // get new acting set
          vector<int> acting;
          int nrep = osdmap->pg_to_acting_osds(pgid, acting);
-         
-         int primary = -1;
-         if (nrep > 0) primary = acting[0];
-       
-         int role = -1;        // -1, 0, 1
-         for (int i=0; i<nrep; i++) 
-               if (acting[i] == whoami) role = i > 0 ? 1:0;
-         
+         int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
          // no change?
          if (acting == pg->acting) 
                continue;
+
+         int primary = -1;
+         if (nrep > 0) primary = acting[0];
          
-         // primary changed?
          int oldrole = pg->get_role();
          int oldprimary = pg->get_primary();
+         int oldacker = pg->get_acker();
          vector<int> oldacting = pg->acting;
          
          // update PG
-         pg->acting = acting;
-         pg->calc_role(whoami);
+         pg->acting.swap(acting);
+         pg->set_role(role);
          
-         // did primary change?
+         // did primary|acker change?
          if (oldprimary != primary) {
                pg->info.same_primary_since = osdmap->get_epoch();
                pg->cancel_recovery();
          }
-         
+         if (oldacker != pg->get_acker()) {
+               pg->info.same_acker_since = osdmap->get_epoch();
+         }
+         if (oldprimary != primary || oldacker != pg->get_acker()) {
+               // drop our write-ahead log.  (we'll only have on if we were just the acker)
+               pg->trim_write_ahead();
+         }
+
          if (role != oldrole) {
                pg->info.same_role_since = osdmap->get_epoch();
 
@@ -1223,51 +1144,32 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                        dout(10) << *pg << " " << oldacting << " -> " << acting
                                         << ", replicas changed" << endl;
 
+                       // completely restart peering process.
+                       pg->clear_primary_state();
+
+                       /* this is compliated, deal with it later.
                        // clear peer_info for (re-)new replicas
                        for (unsigned i=1; i<acting.size(); i++) {
                          bool had = false;
                          for (unsigned j=1; j<oldacting.size(); j++)
-                               if (acting[i] == oldacting[j]) { 
+                               if (pg->acting[i] == oldacting[j]) { 
                                  had = true; 
                                  break;
                                }
                          if (!had) {
-                               dout(10) << *pg << " hosing any peer state for new replica osd" << acting[i] << endl;
-                               pg->peer_info.erase(acting[i]);
-                               pg->peer_info_requested.erase(acting[i]);
-                               pg->peer_missing.erase(acting[i]);
-                               pg->peer_log_requested.erase(acting[i]);
-                               pg->peer_summary_requested.erase(acting[i]);
+                               dout(10) << *pg << " hosing any peer state for new replica osd" << pg->acting[i] << endl;
+                               pg->peer_info.erase(pg->acting[i]);
+                               pg->peer_info_requested.erase(pg->acting[i]);
+                               pg->peer_missing.erase(pg->acting[i]);
+                               pg->peer_log_requested.erase(pg->acting[i]);
+                               pg->peer_summary_requested.erase(pg->acting[i]);
                          }
                        }
+                       */
                  }
                }
          }
          
-         // scan (FIXME newly!) down osds  
-         for (set<int>::const_iterator down = osdmap->get_down_osds().begin();
-                  down != osdmap->get_down_osds().end();
-                  down++) {
-               if (*down == whoami) continue;
-
-               // old peer?
-               bool have = false;
-               for (unsigned i=0; i<oldacting.size(); i++)
-                 if (oldacting[i] == *down) have = true;
-               if (!have) continue;
-               
-               dout(10) << *pg << " old peer osd" << *down << " is down" << endl;
-               
-               // NAK any ops to the down osd
-               if (pg->replica_tids_by_osd.count(*down)) {
-                 set<__uint64_t> s = pg->replica_tids_by_osd[*down];
-                 dout(10) << " " << *pg << " naking replica ops to down osd" << *down << " " << s << endl;
-                 for (set<__uint64_t>::iterator tid = s.begin();
-                          tid != s.end();
-                          tid++)
-                       handle_rep_op_ack(pg, *tid, -1, true, *down);
-               }
-         }
        }
   }
 }
@@ -1526,11 +1428,8 @@ void OSD::load_pgs()
        pg->read_log(store);
 
        // generate state for current mapping
-       //int nrep = 
-       osdmap->pg_to_acting_osds(pgid, pg->acting);
-       int role = -1;
-       for (unsigned i=0; i<pg->acting.size(); i++)
-         if (pg->acting[i] == whoami) role = i>0 ? 1:0;
+       int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
+       int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
        pg->set_role(role);
 
        dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
@@ -1634,7 +1533,11 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
        if (pg_map.count(pgid) == 0) {
          // check mapping.
          vector<int> acting;
-         osdmap->pg_to_acting_osds(pgid, acting);
+         int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+         if (!nrep) {
+               dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " has null mapping" << endl;
+               continue;
+         }
          
          // am i still the primary?
          assert(it->same_primary_since <= osdmap->get_epoch());
@@ -1654,9 +1557,10 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
          
          // ok, create PG!
          pg = create_pg(pgid, t);
-         pg->acting = acting;
-         pg->info.same_primary_since = it->same_primary_since;
+         pg->acting.swap( acting );
          pg->set_role(0);
+         pg->info.same_primary_since = it->same_primary_since;
+         pg->info.same_acker_since = it->same_acker_since;
          pg->info.same_role_since = osdmap->get_epoch();
 
          pg->last_epoch_started_any = it->last_epoch_started;
@@ -1798,6 +1702,7 @@ void OSD::handle_pg_log(MOSDPGLog *m)
 
        // ok active!
        pg->info.same_primary_since = m->info.same_primary_since;
+       pg->info.same_acker_since = m->info.same_acker_since;
        pg->activate(t);
   }
 
@@ -1832,12 +1737,8 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
        if (pg_map.count(pgid) == 0) {
          // get active rush mapping
          vector<int> acting;
-         //int nrep = 
-         osdmap->pg_to_acting_osds(pgid, acting);
-         //assert(nrep > 0);
-         int role = -1;
-         for (unsigned i=0; i<acting.size(); i++)
-               if (acting[i] == whoami) role = i>0 ? 1:0;
+         int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+         int role = osdmap->calc_pg_role(whoami, acting, nrep);
 
          if (role == 0) {
                dout(10) << " pg " << hex << pgid << dec << " dne, and i am primary.  just waiting for notify." << endl;
@@ -1852,10 +1753,11 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
          
          ObjectStore::Transaction t;
          PG *pg = create_pg(pgid, t);
-         pg->acting = acting;
+         pg->acting.swap( acting );
          pg->set_role(role);
          
-         pg->info.same_primary_since = it->second.same_primary_since;  //calc_pg_primary_since(acting[0], pgid, m->get_epoch());
+         pg->info.same_primary_since = it->second.same_primary_since;
+         pg->info.same_acker_since = it->second.same_acker_since;
          pg->info.same_role_since = osdmap->get_epoch();
 
          t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
@@ -1965,16 +1867,10 @@ void OSD::handle_pg_remove(MOSDPGRemove *m)
 
 
 
+/*** RECOVERY ***/
 
-// RECOVERY
-
-
-
-
-
-// pull
-
-
+/** pull - request object from a peer
+ */
 void OSD::pull(PG *pg, object_t oid, eversion_t v)
 {
   assert(pg->missing.loc.count(oid));
@@ -1990,9 +1886,8 @@ void OSD::pull(PG *pg, object_t oid, eversion_t v)
   MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
                                                  oid, pg->get_pgid(),
                                                  osdmap->get_epoch(),
-                                                 OSD_OP_REP_PULL);
+                                                 OSD_OP_PULL);
   op->set_version(v);
-  op->set_pg_role(-1);  // whatever, not 0
   messenger->send_message(op, MSG_ADDR_OSD(osd));
   
   // take note
@@ -2002,86 +1897,96 @@ void OSD::pull(PG *pg, object_t oid, eversion_t v)
 }
 
 
-/** op_rep_pull
- * process request to pull an entire object.
- * NOTE: called from opqueue.
+/** push - send object to a peer
  */
-void OSD::op_rep_pull(MOSDOp *op, PG *pg)
+void OSD::push(PG *pg, object_t oid, int dest)
 {
-  const object_t oid = op->get_oid();
-
-  dout(7) << "rep_pull on " << hex << oid << dec << " v >= " << op->get_version() << endl;
-
   // read data+attrs
   bufferlist bl;
   eversion_t v;
   int vlen = sizeof(v);
   map<string,bufferptr> attrset;
-
+  
   ObjectStore::Transaction t;
   t.read(oid, 0, 0, &bl);
   t.getattr(oid, "version", &v, &vlen);
   t.getattrs(oid, attrset);
   unsigned tr = store->apply_transaction(t);
-
-  if (tr != 0) {
-       // reply with -EEXIST
-       dout(7) << "rep_pull don't have " << hex << oid << dec << endl;  
-       MOSDOpReply *reply = new MOSDOpReply(op, -EEXIST, osdmap->get_epoch(), true); 
-       messenger->send_message(reply, op->get_asker());
-       delete op;
-       return;
-  }
   
-  dout(7) << "rep_pull has " 
-                 << hex << op->get_oid() << dec 
-                 << " v " << v << " >= " << op->get_version()
+  assert(tr == 0);  // !!!
+
+  // ok
+  dout(7) << *pg << " push " << hex << oid << dec << " v " << v 
                  << " size " << bl.length()
-                 << " in " << *pg
+                 << " to osd" << dest
                  << endl;
-  assert(v >= op->get_version());
 
-  logger->inc("r_pull");
-  logger->inc("r_pullb", bl.length());
+  logger->inc("r_push");
+  logger->inc("r_pushb", bl.length());
   
-  // reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true); 
-  reply->set_result(0);
-  reply->set_length(bl.length());
-  reply->set_data(bl);   // note: claims bl, set length above here!
-  reply->set_offset(0);
-  reply->set_version(v);
-  reply->set_attrset(attrset);
-  
-  messenger->send_message(reply, op->get_asker());
+  // send
+  MOSDOp *op = new MOSDOp(++last_tid, MSG_ADDR_OSD(whoami),
+                                                 oid, pg->info.pgid, osdmap->get_epoch(), 
+                                                 OSD_OP_PUSH); 
+  op->set_offset(0);
+  op->set_length(bl.length());
+  op->set_data(bl);   // note: claims bl, set length above here!
+  op->set_version(v);
+  op->set_attrset(attrset);
   
-  delete op;
+  messenger->send_message(op, MSG_ADDR_OSD(dest));
 }
 
 
-/*
- * NOTE: called holding osd_lock.  opqueue active.
+/** op_pull
+ * process request to pull an entire object.
+ * NOTE: called from opqueue.
  */
-void OSD::op_rep_pull_reply(MOSDOpReply *op)
+void OSD::op_pull(MOSDOp *op, PG *pg)
 {
-  object_t oid = op->get_oid();
-  eversion_t v = op->get_version();
-  pg_t pgid = op->get_pg();
+  const object_t oid = op->get_oid();
+  const eversion_t v = op->get_version();
+  int from = op->get_source().num();
 
-  if (pg_map.count(pgid) == 0) {
-       dout(7) << "rep_pull_reply on pg " << hex << pgid << dec << ", dne" << endl;
+  dout(7) << "op_pull " << hex << oid << dec << " v " << op->get_version()
+                 << " from " << op->get_source()
+                 << endl;
+
+  // am i missing it?
+  if (waitfor_missing_object(op, pg)) {
+       // ok, i better the primary...  (or else there's a map mismatch, or the primary is wrong about my objects, or?)
+       assert(pg->is_primary());
        return;
   }
 
-  PG *pg = _lock_pg(pgid);
+  // is a replica asking?  are they missing it?
+  if (pg->is_primary() &&
+         (pg->peer_missing.count(from) == 0 ||
+          !pg->peer_missing[from].is_missing(oid))) {
+       dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
+       delete op;
+       return;
+  }
+  
+  // push it back!
+  push(pg, oid, op->get_source().num());
+}
 
-  if (!pg->objects_pulling.count(oid)) {
-       dout(7) << "rep_pull_reply on object " << hex << oid << dec << ", not pulling" << endl; 
-       _unlock_pg(pgid);
+
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void OSD::op_push(MOSDOp *op, PG *pg)
+{
+  object_t oid = op->get_oid();
+  eversion_t v = op->get_version();
+
+  if (!pg->missing.is_missing(oid)) {
+       dout(7) << "op_push not missing object " << hex << oid << dec << endl;
        return;
   }
   
-  dout(7) << "rep_pull_reply " 
+  dout(7) << "op_push " 
                  << hex << oid << dec 
                  << " v " << v 
                  << " size " << op->get_length() << " " << op->get_data().length()
@@ -2089,22 +1994,13 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
                  << endl;
 
   assert(op->get_data().length() == op->get_length());
-
+  
   // write object and add it to the PG
   ObjectStore::Transaction t;
   t.remove(oid);  // in case old version exists
   t.write(oid, 0, op->get_length(), op->get_data());
   t.setattrs(oid, op->get_attrset());
-  t.collection_add(pgid, oid);
-
-  // close out pull op.
-  num_pulling--;
-  pg->objects_pulling.erase(oid);
-  pg->missing.got(oid, v);
-
-  // kick waiters
-  if (pg->waiting_for_missing_object.count(oid)) 
-       take_waiters(pg->waiting_for_missing_object[oid]);
+  t.collection_add(pg->info.pgid, oid);
 
   // raise last_complete?
   assert(pg->log.complete_to != pg->log.log.end());
@@ -2116,15 +2012,37 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
   }
   dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
   
-  // continue
-  pg->do_recovery();
-
   // apply to disk!
-  t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+  t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info));
   unsigned r = store->apply_transaction(t);
   assert(r == 0);
 
-  _unlock_pg(pgid);
+
+  // close out pull op?
+  num_pulling--;
+  if (pg->objects_pulling.count(oid))
+       pg->objects_pulling.erase(oid);
+  pg->missing.got(oid, v);
+
+  // am i primary?  are others missing this too?
+  if (pg->is_primary()) {
+       for (unsigned i=1; i<pg->acting.size(); i++) {
+         int peer = pg->acting[i];
+         if (pg->peer_missing.count(peer) &&
+                 pg->peer_missing[peer].is_missing(oid)) {
+               // ok, push it, and they (will) have it now.
+               pg->peer_missing[peer].got(oid, v);
+               push(pg, oid, peer);
+         }
+       }
+
+       // continue recovery
+       pg->do_recovery();
+  }
+
+  // kick waiters
+  if (pg->waiting_for_missing_object.count(oid)) 
+       take_waiters(pg->waiting_for_missing_object[oid]);
 
   delete op;
 }
@@ -2139,6 +2057,7 @@ class C_OSD_RepModifyCommit : public Context {
 public:
   OSD *osd;
   MOSDOp *op;
+  int destosd;
 
   eversion_t pg_last_complete;
 
@@ -2147,8 +2066,9 @@ public:
   bool acked;
   bool waiting;
 
-  C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, eversion_t lc) : osd(o), op(oo), pg_last_complete(lc),
-                                                                                                                       acked(false), waiting(false) { }
+  C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) : 
+       osd(o), op(oo), destosd(dosd), pg_last_complete(lc),
+       acked(false), waiting(false) { }
   void finish(int r) {
        lock.Lock();
        while (!acked) {
@@ -2157,7 +2077,7 @@ public:
        }
        assert(acked);
        lock.Unlock();
-       osd->op_rep_modify_commit(op, pg_last_complete);
+       osd->op_rep_modify_commit(op, destosd, pg_last_complete);
   }
   void ack() {
        lock.Lock();
@@ -2167,13 +2087,15 @@ public:
   }
 };
 
-void OSD::op_rep_modify_commit(MOSDOp *op, eversion_t last_complete)
+void OSD::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete)
 {
   // send commit.
-  dout(10) << "rep_modify_commit on op " << *op << endl;
+  dout(10) << "rep_modify_commit on op " << *op
+                  << ", sending commit to osd" << ackerosd
+                  << endl;
   MOSDOpReply *commit = new MOSDOpReply(op, 0, osdmap->get_epoch(), true);
   commit->set_pg_complete_thru(last_complete);
-  messenger->send_message(commit, op->get_asker());
+  messenger->send_message(commit, MSG_ADDR_OSD(ackerosd));
   delete op;
 }
 
@@ -2182,11 +2104,12 @@ void OSD::op_rep_modify_commit(MOSDOp *op, eversion_t last_complete)
 class C_OSD_WriteCommit : public Context {
 public:
   OSD *osd;
-  OSDReplicaOp *repop;
+  pg_t pgid;
+  tid_t rep_tid;
   eversion_t pg_last_complete;
-  C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op, eversion_t lc) : osd(o), repop(op), pg_last_complete(lc) {}
+  C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {}
   void finish(int r) {
-       osd->op_modify_commit(repop, pg_last_complete);
+       osd->op_modify_commit(pgid, rep_tid, pg_last_complete);
   }
 };
 
@@ -2200,77 +2123,103 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
   object_t oid = op->get_oid();
   eversion_t nv = op->get_version();
 
+  const char *opname = MOSDOp::get_opname(op->get_op());
+  dout(10) << "op_rep_modify " << opname 
+                  << " " << hex << oid << dec 
+                  << " v " << nv 
+                  << " " << op->get_offset() << "~" << op->get_length()
+                  << " in " << *pg
+                  << endl;  
+  
+  // we better not be missing this.
+  assert(!pg->missing.is_missing(oid));
+
+  // prepare our transaction
   ObjectStore::Transaction t;
 
+  // update PG log
   if (op->get_op() != OSD_OP_WRNOOP) {
-       // update PG log
-       if (pg->info.last_update < nv)
-         prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
-       // else, we are playing catch-up, don't update pg metadata!  (FIXME?)
+       prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
+       logger->inc("r_wr");
+       logger->inc("r_wrb", op->get_length());
   }
+
+  // am i acker?
+  PG::RepOpGather *repop = 0;
+  int ackerosd = pg->acting[0];
+
+  if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
+       ackerosd = pg->get_tail();
   
-  // do op?
-  C_OSD_RepModifyCommit *oncommit = 0;
-  
-  // check current version
-  eversion_t myv = 0;
-  store->getattr(oid, "version", &myv, sizeof(myv));  // this is a noop if oid dne
-  dout(10) << "op_rep_modify existing " << hex << oid << dec << " v " << myv << endl;
-
-  // is this an old update?  or WRNOOP?
-  if (nv <= myv || op->get_op() == OSD_OP_WRNOOP) {
-       // we have a newer version.  pretend we do a regular commit!
-       dout(10) << "op_rep_modify on " << hex << oid << dec 
-                        << " v " << nv << " <= myv | wrnoop, noop"
-                        << " in " << *pg 
-                        << endl;
-       oncommit = new C_OSD_RepModifyCommit(this, op,
-                                                                                pg->info.last_complete);
+       if (pg->is_tail()) {
+         // i am tail acker.
+         if (pg->repop_gather.count(op->get_rep_tid())) {
+               repop = pg->repop_gather[ op->get_rep_tid() ];
+         } else {
+               repop = new_repop_gather(pg, op);
+         }
+         
+         // infer ack from source
+         int fromosd = op->get_source().num();
+         get_repop_gather(repop);
+         {
+               //assert(repop->waitfor_ack.count(fromosd));   // no, we may come thru here twice.
+               repop->waitfor_ack.erase(fromosd);
+         }
+         put_repop_gather(pg, repop);
+
+         // prepare dest socket
+         //messenger->prepare_send_message(op->get_client());
+       }
+
+       // chain?  forward?
+       if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_tail()) {
+         // chain rep, not at the tail yet.
+         int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
+         issue_repop(pg, op, pg->acting[myrank+1]);
+       }
   }
 
-  // missing?
-  else if (pg->missing.missing.count(oid)) {
-       // old or missing.  wait!
-       dout(10) << "op_rep_modify on " << hex << oid << dec 
-                        << " v " << nv << " > myv, wait"
-                        << " in " << *pg 
-                        << endl;
-       if (pg->missing.missing[oid] > op->get_version())
-         pg->missing.add(oid, op->get_version());  // now we're missing the _newer_ version.
-       waitfor_missing_object(op, pg);
-  } 
-  else {
-       // we're good.
-       dout(10) << "op_rep_modify on " << hex << oid << dec 
-                        << " v " << nv << " (from " << myv << ")"
-                        << " in " << *pg 
-                        << endl;
-       assert(op->get_old_version() == myv);
-       
-       prepare_op_transaction(t, op, nv, pg);
-       oncommit = new C_OSD_RepModifyCommit(this, op,
-                                                                                pg->info.last_complete);
+  // do op?
+  C_OSD_RepModifyCommit *oncommit = 0;
+  if (repop) {
+       // acker.  we'll apply later.
+  } else {
+       // middle|replica.
+       if (op->get_op() != OSD_OP_WRNOOP) 
+         prepare_op_transaction(t, op, nv, pg);
 
-       logger->inc("r_wr");
-       logger->inc("r_wrb", op->get_length());
+       oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, pg->info.last_complete);
   }
 
-  // go
+  // apply log update. and possibly update itself.
   unsigned tr = store->apply_transaction(t, oncommit);
   if (tr != 0 &&   // no errors
          tr != 2) {   // or error on collection_add
        cerr << "error applying transaction: r = " << tr << endl;
        assert(tr == 0);
   }
-
+  
   // ack?
-  if (oncommit) {
-       // ack
-       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
-       messenger->send_message(ack, op->get_asker());
-       oncommit->ack(); 
+  if (repop) {
+       // (logical) local ack.  this may induce the actual update.
+       get_repop_gather(repop);
+       {
+         assert(repop->waitfor_ack.count(whoami));
+         repop->waitfor_ack.erase(whoami);
+       }
+       put_repop_gather(pg, repop);
+  } 
+  else {
+       // send ack to acker?
+       if (g_conf.osd_rep != OSD_REP_CHAIN) {
+         MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
+         messenger->send_message(ack, MSG_ADDR_OSD(ackerosd));
+       }
 
-       pg->last_heartbeat = g_clock.now();
+       // ack myself.
+       assert(oncommit);
+       oncommit->ack(); 
   }
 }
 
@@ -2281,16 +2230,16 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
 void OSD::handle_op(MOSDOp *op)
 {
   const pg_t pgid = op->get_pg();
-  int acting_primary = osdmap->get_pg_acting_primary( pgid );
   PG *pg = get_pg(pgid);
 
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
 
+  // share our map with sender, if they're old
   _share_map_incoming(op->get_source(), op->get_map_epoch());
 
   // what kind of op?
-  if (!OSD_OP_IS_REP(op->get_op())) {
+  if (!op->get_source().is_osd()) {
        // REGULAR OP (non-replication)
 
        // have pg?
@@ -2301,17 +2250,31 @@ void OSD::handle_op(MOSDOp *op)
          waiting_for_pg[pgid].push_back(op);
          return;
        }
-               
-       // am i the (same) primary?
-       if (acting_primary != whoami ||
-               op->get_map_epoch() < pg->info.same_primary_since) {
-         dout(7) << "acting primary is osd" << acting_primary
-                         << " since " << pg->info.same_primary_since 
-                         << ", dropping" << endl;
-         assert(op->get_map_epoch() < osdmap->get_epoch());
-         return;
+       
+       bool read = op->get_op() < 10;
+
+       if (read) {
+         // read. am i the (same) acker?
+         if (pg->get_acker() != whoami ||
+                 op->get_map_epoch() < pg->info.same_acker_since) {
+               dout(7) << "acting acker is osd" << pg->get_acker()
+                               << " since " << pg->info.same_acker_since 
+                               << ", dropping" << endl;
+               assert(op->get_map_epoch() < osdmap->get_epoch());
+               return;
+         }
+       } else {
+         // write. am i the (same) primary?
+         if (pg->get_primary() != whoami ||
+                 op->get_map_epoch() < pg->info.same_primary_since) {
+               dout(7) << "acting primary is osd" << pg->get_primary()
+                               << " since " << pg->info.same_primary_since 
+                               << ", dropping" << endl;
+               assert(op->get_map_epoch() < osdmap->get_epoch());
+               return;
+         }
        }
-
+       
        // must be active.
        if (!pg->is_active()) {
          // replay?
@@ -2330,38 +2293,32 @@ void OSD::handle_op(MOSDOp *op)
          pg->waiting_for_active.push_back(op);
          return;
        }
-
+       
        dout(7) << "handle_op " << op << " in " << *pg << endl;
        
   } else {
-       // REPLICATION OP
+       // REPLICATION OP (it's from another OSD)
 
        // have pg?
        if (!pg) {
          dout(7) << "handle_rep_op " << op 
-                         << " in pgid " << hex << pgid << dec << endl;
-         waiting_for_pg[pgid].push_back(op);
+                         << " pgid " << hex << pgid << dec << " dne" << endl;
+         delete op;
          return;
        }
-
-    // check osd map.  same primary?
+       
+    // check osd map: same primary+acker?
        if (op->get_map_epoch() != osdmap->get_epoch()) {
-         // make sure source is still primary
-         const int myrole = pg->get_role();  //osdmap->get_pg_acting_role(op->get_pg(), whoami);
-         
-         if (acting_primary != MSG_ADDR_NUM(op->get_source()) ||
-                 myrole <= 0 ||
-                 op->get_map_epoch() < pg->info.same_primary_since) {
-               dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
-                               << ", primary changed on pg " << hex << op->get_pg() << dec
-                               << endl;
-               MOSDOpReply *fail = new MOSDOpReply(op, -EAGAIN, osdmap->get_epoch(), true);  // FIXME error code?
-               messenger->send_message(fail, op->get_asker());
+         if (op->get_map_epoch() < pg->info.same_primary_since ||
+                 op->get_map_epoch() < pg->info.same_acker_since) {
+               // drop message.
+               delete op;
                return;
          }
-         
-         dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
-                         << ", primary same on pg " << hex << op->get_pg() << dec
+         assert(pg->get_role() >= 0);
+
+         dout(5) << "handle_rep_op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
+                         << ", but primary+acker same in " << *pg
                          << endl;
        }
        
@@ -2375,11 +2332,44 @@ void OSD::handle_op(MOSDOp *op)
   }
 }
 
+void OSD::handle_op_reply(MOSDOpReply *op)
+{
+  if (op->get_map_epoch() < boot_epoch) {
+       dout(3) << "replica op reply from before boot" << endl;
+       delete op;
+       return;
+  }
+
+  // must be a rep op.
+  assert(op->get_source().is_osd());
+  
+  // make sure we have the pg
+  const pg_t pgid = op->get_pg();
+  PG *pg = get_pg(pgid);
+
+  // require same or newer map
+  if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+
+  // share our map with sender, if they're old
+  _share_map_incoming(op->get_source(), op->get_map_epoch());
+
+  if (!pg) {
+       // hmm.
+       delete op;
+  }
+
+  if (g_conf.osd_maxthreads < 1) {
+       do_op(op, pg); // do it now
+  } else {
+       enqueue_op(pgid, op);   // queue for worker threads
+  }
+}
+
 
 /*
  * enqueue called with osd_lock held
  */
-void OSD::enqueue_op(pg_t pgid, MOSDOp *op)
+void OSD::enqueue_op(pg_t pgid, Message *op)
 {
   while (pending_ops > g_conf.osd_max_opq) {
        dout(10) << "enqueue_op waiting for pending_ops " << pending_ops << " to drop to " << g_conf.osd_max_opq << endl;
@@ -2398,7 +2388,7 @@ void OSD::enqueue_op(pg_t pgid, MOSDOp *op)
  */
 void OSD::dequeue_op(pg_t pgid)
 {
-  MOSDOp *op;
+  Message *op;
   PG *pg;
 
   osd_lock.Lock();
@@ -2407,7 +2397,7 @@ void OSD::dequeue_op(pg_t pgid)
        pg = _lock_pg(pgid);  
 
        // get pending op
-       list<MOSDOp*> &ls  = op_queue[pgid];
+       list<Message*> &ls  = op_queue[pgid];
        assert(!ls.empty());
        op = ls.front();
        ls.pop_front();
@@ -2449,48 +2439,34 @@ void OSD::dequeue_op(pg_t pgid)
  * object lock will be held (if multithreaded)
  * osd_lock NOT held.
  */
-void OSD::do_op(MOSDOp *op, PG *pg) 
+void OSD::do_op(Message *m, PG *pg) 
 {
-  dout(10) << "do_op " << *op
-       //<< " on " << hex << op->get_oid() << dec
-                  << " in " << *pg << endl;
+  //dout(15) << "do_op " << *op << " in " << *pg << endl;
+
+  if (m->get_type() == MSG_OSD_OP) {
+       MOSDOp *op = (MOSDOp*)m;
 
-  logger->inc("op");
+       logger->inc("op");
 
-  // replication ops?
-  if (OSD_OP_IS_REP(op->get_op())) {
-       // replication/recovery
        switch (op->get_op()) {
-       case OSD_OP_REP_PULL:
-         op_rep_pull(op, pg);
+         
+         // rep stuff
+       case OSD_OP_PULL:
+         op_pull(op, pg);
          break;
-
-         // replicated ops
-       case OSD_OP_REP_WRNOOP:
-       case OSD_OP_REP_WRITE:
-       case OSD_OP_REP_TRUNCATE:
-       case OSD_OP_REP_DELETE:
-       case OSD_OP_REP_WRLOCK:
-       case OSD_OP_REP_WRUNLOCK:
-       case OSD_OP_REP_RDLOCK:
-       case OSD_OP_REP_RDUNLOCK:
-       case OSD_OP_REP_UPLOCK:
-       case OSD_OP_REP_DNLOCK:
-         op_rep_modify(op, pg);
+       case OSD_OP_PUSH:
+         op_push(op, pg);
          break;
-
-       default:
-         assert(0);      
-       }
-  } else {
-       // regular op
-       switch (op->get_op()) {
+         
+         // reads
        case OSD_OP_READ:
          op_read(op, pg);
          break;
        case OSD_OP_STAT:
          op_stat(op, pg);
          break;
+         
+         // writes
        case OSD_OP_WRNOOP:
        case OSD_OP_WRITE:
        case OSD_OP_ZERO:
@@ -2502,15 +2478,40 @@ void OSD::do_op(MOSDOp *op, PG *pg)
        case OSD_OP_RDUNLOCK:
        case OSD_OP_UPLOCK:
        case OSD_OP_DNLOCK:
-         op_modify(op, pg);
+         if (op->get_source().is_osd()) 
+               op_rep_modify(op, pg);
+         else
+               op_modify(op, pg);
          break;
+         
        default:
          assert(0);
        }
-  }
+  } 
+  else if (m->get_type() == MSG_OSD_OPREPLY) {
+       // must be replication.
+       MOSDOpReply *r = (MOSDOpReply*)m;
+       tid_t rep_tid = r->get_rep_tid();
+  
+       if (pg->repop_gather.count(rep_tid)) {
+         // oh, good.
+         int fromosd = r->get_source().num();
+         repop_ack(pg, pg->repop_gather[rep_tid], 
+                               r->get_result(), r->get_commit(), 
+                               fromosd, 
+                               r->get_pg_complete_thru());
+         delete m;
+       } else {
+         // early ack.
+         pg->waiting_for_repop[rep_tid].push_back(r);
+       }
 
+  } else
+       assert(0);
 }
 
+
+
 void OSD::wait_for_no_ops()
 {
   if (pending_ops > 0) {
@@ -2541,7 +2542,7 @@ bool OSD::block_if_wrlocked(MOSDOp* op)
   //cout << "getattr returns " << len << " on " << hex << oid << dec << endl;
 
   if (len == sizeof(source) &&
-         source != op->get_asker()) {
+         source != op->get_client()) {
        //the object is locked for writing by someone else -- add the op to the waiting queue     
        waiting_for_wr_unlock[oid].push_back(op);
        return true;
@@ -2628,13 +2629,13 @@ void OSD::op_read(MOSDOp *op, PG *pg)
        reply->set_length(0);
   }
   
-  dout(12) << " read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+  dout(10) << " read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
   
   logger->inc("rd");
   if (got >= 0) logger->inc("rdb", got);
   
   // send it
-  messenger->send_message(reply, op->get_asker());
+  messenger->send_message(reply, op->get_client());
   
   delete op;
 }
@@ -2657,11 +2658,15 @@ void OSD::op_stat(MOSDOp *op, PG *pg)
   memset(&st, sizeof(st), 0);
   int r = store->stat(oid, &st);
   
-  dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+  dout(3) << "op_stat on " << hex << oid << dec 
+                 << " r = " << r
+                 << " size = " << st.st_size
+                 << " in " << *pg
+                 << endl;
   
   MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap->get_epoch(), true);
   reply->set_object_size(st.st_size);
-  messenger->send_message(reply, op->get_asker());
+  messenger->send_message(reply, op->get_client());
   
   logger->inc("stat");
 
@@ -2670,73 +2675,46 @@ void OSD::op_stat(MOSDOp *op, PG *pg)
 
 
 
-// WRITE OPS
-
-
-
-void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd)
-{
-  MOSDOp *op = repop->op;
-  object_t oid = op->get_oid();
-
-  dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
-  
-  // forward the write/update/whatever
-  __uint64_t tid = ++last_tid;
-  MOSDOp *wr = new MOSDOp(tid,
-                                                 messenger->get_myaddr(),
-                                                 oid,
-                                                 pg->get_pgid(),
-                                                 osdmap->get_epoch(),
-                                                 100+op->get_op());
-  wr->get_data() = op->get_data();   // copy bufferlist
-  wr->set_length(op->get_length());
-  wr->set_offset(op->get_offset());
-  wr->set_version(repop->new_version);
-  wr->set_old_version(repop->old_version);
-  wr->set_pg_role(1); // replica
-  wr->set_pg_trim_to(pg->peers_complete_thru);
-  wr->set_orig_asker(op->get_asker());
-  wr->set_orig_tid(op->get_tid());
-  messenger->send_message(wr, MSG_ADDR_OSD(osd));
-  
-  repop->osds.insert(osd);
-
-  repop->waitfor_ack[tid] = osd;
-  repop->waitfor_commit[tid] = osd;
-  
-  //replica_ops[tid] = repop;
-  //replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
-  pg->replica_ops[tid] = repop;
-  pg->replica_tids_by_osd[osd].insert(tid);
-}
-
+/*********
+ * new repops
+ */
 
-void OSD::get_repop(OSDReplicaOp *repop)
+void OSD::get_repop_gather(PG::RepOpGather *repop)
 {
-  repop->lock.Lock();
+  //repop->lock.Lock();
   dout(10) << "get_repop " << *repop << endl;
 }
 
-void OSD::put_repop(OSDReplicaOp *repop)
+void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
 {
   dout(10) << "put_repop " << *repop << endl;
 
   // commit?
   if (repop->can_send_commit() &&
          repop->op->wants_commit()) {
+       // send commit.
        MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), true);
        dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl;
-       messenger->send_message(reply, repop->op->get_asker());
+       messenger->send_message(reply, repop->op->get_client());
        repop->sent_commit = true;
   }
 
   // ack?
   else if (repop->can_send_ack() &&
                   repop->op->wants_ack()) {
+       // apply
+       dout(10) << "put_repop applying update on " << *repop << endl;
+       ObjectStore::Transaction t;
+       prepare_op_transaction(t, repop->op, repop->new_version, pg);
+       Context *oncommit = new C_OSD_WriteCommit(this, pg->info.pgid, repop->rep_tid, repop->pg_local_last_complete);
+       unsigned r = store->apply_transaction(t, oncommit);
+       if (r)
+         dout(-10) << "put_repop apply transaction return " << r << " on " << *repop << endl;
+
+       // send ack
        MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), false);
        dout(10) << "put_repop sending ack on " << *repop << " " << reply << endl;
-       messenger->send_message(reply, repop->op->get_asker());
+       messenger->send_message(reply, repop->op->get_client());
        repop->sent_ack = true;
 
        utime_t now = g_clock.now();
@@ -2749,47 +2727,175 @@ void OSD::put_repop(OSDReplicaOp *repop)
   if (repop->can_delete()) {
        // adjust peers_complete_thru
        if (!repop->pg_complete_thru.empty()) {
-         pg_t pgid = repop->op->get_pg();
-         osd_lock.Lock();
-         PG *pg = get_pg(pgid);
-         if (pg) {
-               eversion_t min = pg->info.last_complete;  // hrm....
-               for (unsigned i=1; i<pg->acting.size(); i++) {
-                 if (repop->pg_complete_thru[i] < min)      // note: if we haven't heard, it'll be zero, which is what we want.
-                       min = repop->pg_complete_thru[i];
-               }
-
-               if (min > pg->peers_complete_thru) {
-                 dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
-                 pg->peers_complete_thru = min;
-               }
-               //_unlock_pg(pgid);
+         eversion_t min = pg->info.last_complete;  // hrm....
+         for (unsigned i=1; i<pg->acting.size(); i++) {
+               if (repop->pg_complete_thru[i] < min)      // note: if we haven't heard, it'll be zero, which is what we want.
+                 min = repop->pg_complete_thru[i];
+         }
+         
+         if (min > pg->peers_complete_thru) {
+               dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+               pg->peers_complete_thru = min;
          }
-         osd_lock.Unlock();
        }
 
        dout(10) << "put_repop deleting " << *repop << endl;
-       repop->lock.Unlock();  
+       //repop->lock.Unlock();  
        delete repop->op;
        delete repop;
   } else {
-       repop->lock.Unlock();
+       //repop->lock.Unlock();
   }
 }
 
 
-void OSD::op_modify_commit(OSDReplicaOp *repop, eversion_t pg_complete_thru)
+void OSD::issue_repop(PG *pg, MOSDOp *op, int osd)
 {
-  dout(10) << "op_modify_commit on op " << *repop->op << endl;
-  get_repop(repop);
+  object_t oid = op->get_oid();
+
+  dout(7) << " issue_repop rep_tid " << op->get_rep_tid()
+                 << " in " << *pg 
+                 << " o " << hex << oid << dec
+                 << " to osd" << osd
+                 << endl;
+  
+  // forward the write/update/whatever
+  MOSDOp *wr = new MOSDOp(op->get_tid(),
+                                                 op->get_client(),
+                                                 oid,
+                                                 pg->get_pgid(),
+                                                 osdmap->get_epoch(),
+                                                 op->get_op());
+  wr->get_data() = op->get_data();   // _copy_ bufferlist
+  wr->set_length(op->get_length());
+  wr->set_offset(op->get_offset());
+  wr->set_version(op->get_version());
+
+  wr->set_rep_tid(op->get_rep_tid());
+  wr->set_pg_trim_to(pg->peers_complete_thru);
+
+  messenger->send_message(wr, MSG_ADDR_OSD(osd));
+}
+
+PG::RepOpGather *OSD::new_repop_gather(PG *pg, 
+                                                                          MOSDOp *op)
+{
+  dout(10) << "new_repop_gather rep_tid " << op->get_rep_tid() << " on " << *op << " in " << *pg << endl;
+
+  PG::RepOpGather *repop = new PG::RepOpGather(op, op->get_rep_tid(), 
+                                                                                          op->get_version(), 
+                                                                                          pg->info.last_complete);
+
+  // osds. commits all come to me.
+  for (unsigned i=0; i<pg->acting.size(); i++) {
+       int osd = pg->acting[i];
+       repop->osds.insert(osd);
+       repop->waitfor_commit.insert(osd);
+  }
+
+  // acks vary:
+  if (g_conf.osd_rep == OSD_REP_CHAIN) {
+       // chain rep. 
+       // there's my local ack...
+       repop->osds.insert(whoami);
+       repop->waitfor_ack.insert(whoami);
+       repop->waitfor_commit.insert(whoami);
+
+       // also, the previous guy will ack to me
+       int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
+       if (myrank > 0) {
+         int osd = pg->acting[ myrank-1 ];
+         repop->osds.insert(osd);
+         repop->waitfor_ack.insert(osd);
+         repop->waitfor_commit.insert(osd);
+       }
+  } else {
+       // primary, splay.  all osds ack to me.
+       for (unsigned i=0; i<pg->acting.size(); i++) {
+         int osd = pg->acting[i];
+         repop->waitfor_ack.insert(osd);
+       }
+  }
+
+  repop->start = g_clock.now();
+
+  pg->repop_gather[ repop->rep_tid ] = repop;
+
+  // anyone waiting?  (acks that got here before the op did)
+  if (pg->waiting_for_repop.count(repop->rep_tid)) {
+       take_waiters(pg->waiting_for_repop[repop->rep_tid]);
+       pg->waiting_for_repop.erase(repop->rep_tid);
+  }
+
+  return repop;
+}
+
+void OSD::repop_ack(PG *pg, PG::RepOpGather *repop,
+                                       int result, bool commit,
+                                       int fromosd, eversion_t pg_complete_thru)
+{
+  MOSDOp *op = repop->op;
+
+  dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
+                 << " result " << result << " commit " << commit << " from osd" << fromosd
+                 << " in " << *pg
+                 << endl;
+
+  get_repop_gather(repop);
   {
-       assert(repop->waitfor_commit.count(0));
-       repop->waitfor_commit.erase(0);
-       repop->pg_complete_thru[whoami] = pg_complete_thru;
+       if (commit) {
+         // commit
+         assert(repop->waitfor_commit.count(fromosd));   
+         repop->waitfor_commit.erase(fromosd);
+         repop->waitfor_ack.erase(fromosd);
+         
+         repop->pg_complete_thru[fromosd] = pg_complete_thru;
+         
+         if (repop->waitfor_commit.empty()) {
+               pg->repop_gather.erase(repop->rep_tid);
+         }
+       } else {
+         // ack
+         repop->waitfor_ack.erase(fromosd);
+       }
   }
-  put_repop(repop);
+  put_repop_gather(pg, repop);
 }
 
+
+
+
+
+/** op_modify_commit
+ * transaction commit on the acker.
+ */
+void OSD::op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru)
+{
+  PG *pg = lock_pg(pgid);
+  if (pg) {
+       if (pg->repop_gather.count(rep_tid)) {
+         PG::RepOpGather *repop = pg->repop_gather[rep_tid];
+         
+         dout(10) << "op_modify_commit " << *repop->op << endl;
+         get_repop_gather(repop);
+         {
+               assert(repop->waitfor_commit.count(whoami));
+               repop->waitfor_commit.erase(whoami);
+               repop->pg_complete_thru[whoami] = pg_complete_thru;
+         }
+         put_repop_gather(pg, repop);
+       } else {
+         dout(10) << "op_modify_commit pg " << hex << pgid << dec << " rep_tid " << rep_tid << " dne" << endl;
+       }
+
+       unlock_pg(pgid);
+  } else {
+       dout(10) << "op_modify_commit pg " << hex << pgid << dec << " dne" << endl;
+  }
+}
+
+
 /** op_modify
  * process client modify op
  * NOTE: called from opqueue.
@@ -2803,8 +2909,19 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
   // missing?
   if (waitfor_missing_object(op, pg)) return;
   
+  // are any peers missing this?
+  for (unsigned i=1; i<pg->acting.size(); i++) {
+       int peer = pg->acting[i];
+       if (pg->peer_missing.count(i) &&
+               pg->peer_missing[peer].is_missing(oid)) {
+         // push it before this update.  FIXME, this is probably extra much work (eg if we're about to overwrite)
+         pg->peer_missing[peer].got(oid);
+         push(pg, oid, peer);
+       }
+  }
+
   // dup op?
-  reqid_t reqid(op->get_asker(), op->get_tid());
+  reqid_t reqid(op->get_client(), op->get_tid());
   if (pg->log.logged_req(reqid)) {
        dout(-3) << "op_modify " << opname << " dup op " << reqid
                         << ", doing WRNOOP" << endl;
@@ -2819,38 +2936,30 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
        return; // op will be handled later, after the object unlocks
 
 
-  // old version
-  eversion_t ov = 0;  // 0 == dne (yet)
-  store->getattr(oid, "version", &ov, sizeof(ov));
-  
-  // new version
-  eversion_t nv;
-  if (op->get_op() == OSD_OP_WRNOOP)
-       nv = ov;
-  else {
-       nv = pg->info.last_update;
+  // assign version
+  eversion_t nv = pg->log.top;
+  if (op->get_op() != OSD_OP_WRNOOP) {
        nv.epoch = osdmap->get_epoch();
        nv.version++;
        assert(nv > pg->info.last_update);
-       assert(nv > ov);
+       assert(nv > pg->log.top);
 
        if (op->get_version().version) {
          // replay
          if (nv.version < op->get_version().version)
                nv.version = op->get_version().version; 
        } 
-
-       // set version in op, for benefit of client and our eventual reply
-       op->set_version(nv);
   }
+
+  // set version in op, for benefit of client and our eventual reply
+  op->set_version(nv);
   
   dout(10) << "op_modify " << opname 
                   << " " << hex << oid << dec 
                   << " v " << nv 
-                  << " ov " << ov 
-                  << "  off " << op->get_offset() << " len " << op->get_length() 
+                  << " " << op->get_offset() << "~" << op->get_length()
                   << endl;  
+
   // share latest osd map?
   osd_lock.Lock();
   {
@@ -2858,47 +2967,69 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
          _share_map_outgoing( MSG_ADDR_OSD(i) ); 
   }
   osd_lock.Unlock();
-  
+
   // issue replica writes
-  OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
-  repop->start = g_clock.now();
-  repop->waitfor_ack[0] = whoami;    // will need local ack, commit
-  repop->waitfor_commit[0] = whoami;
-  
-  repop->lock.Lock();
-  {
-       for (unsigned i=1; i<pg->acting.size(); i++)
-         issue_replica_op(pg, repop, pg->acting[i]);
+  PG::RepOpGather *repop = 0;
+  bool alone = (pg->acting.size() == 1);
+  tid_t rep_tid = ++last_tid;
+  op->set_rep_tid(rep_tid);
+
+  if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
+       // chain rep.  send to #2 only.
+       issue_repop(pg, op, pg->acting[1]);
+  } 
+  else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
+       // splay rep.  send to rest.
+       for (unsigned i=1; i<pg->acting.size(); ++i)
+       //for (unsigned i=pg->acting.size()-1; i>=1; --i)
+         issue_repop(pg, op, pg->acting[i]);
+  } else {
+       // primary rep, or alone.
+       repop = new_repop_gather(pg, op);
+
+       // send to rest.
+       if (!alone)
+         for (unsigned i=1; i<pg->acting.size(); i++)
+               issue_repop(pg, op, pg->acting[i]);
   }
-  repop->lock.Unlock();
 
-  // prepare
-  ObjectStore::Transaction t;
-  if (op->get_op() != OSD_OP_WRNOOP) {
+  if (repop) { 
+       // we are acker.
+       if (op->get_op() != OSD_OP_WRNOOP) {
+         // log now
+         ObjectStore::Transaction t;
+         prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
+         store->apply_transaction(t);
+
+         // update later.
+       }
+
+       // (logical) local ack.
+       // (if alone, this will apply the update.)
+       get_repop_gather(repop);
+       {
+         assert(repop->waitfor_ack.count(whoami));
+         repop->waitfor_ack.erase(whoami);
+       }
+       put_repop_gather(pg, repop);
+
+  } else {
+       // chain or splay.  apply.
+       ObjectStore::Transaction t;
        prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
        prepare_op_transaction(t, op, nv, pg);
+
+       C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_tail(), 
+                                                                                                                               pg->info.last_complete);
+       unsigned r = store->apply_transaction(t, oncommit);
+       if (r != 0 &&   // no errors
+               r != 2) {   // or error on collection_add
+         cerr << "error applying transaction: r = " << r << endl;
+         assert(r == 0);
+       }
+
+       oncommit->ack();
   }
-  
-  // apply
-  Context *oncommit = new C_OSD_WriteCommit(this, repop, pg->info.last_complete);
-  unsigned r = store->apply_transaction(t, oncommit);
-  if (r != 0 &&   // no errors
-         r != 2) {   // or error on collection_add
-       cerr << "error applying transaction: r = " << r << endl;
-       assert(r == 0);
-  }
-
-  // pre-ack
-  //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
-  //messenger->send_message(reply, op->get_asker());
-  // local ack
-  get_repop(repop);
-  {
-       assert(repop->waitfor_ack.count(0));
-       repop->waitfor_ack.erase(0);
-  }
-  put_repop(repop);
 
   if (op->get_op() == OSD_OP_WRITE) {
        logger->inc("c_wr");
@@ -2913,13 +3044,11 @@ void OSD::prepare_log_transaction(ObjectStore::Transaction& t,
                                                                  eversion_t trim_to)
 {
   const object_t oid = op->get_oid();
-  const pg_t pgid = op->get_pg();
   
   int opcode = PG::Log::Entry::UPDATE;
-  if (op->get_op() == OSD_OP_DELETE ||
-         op->get_op() == OSD_OP_REP_DELETE) opcode = PG::Log::Entry::DELETE;
+  if (op->get_op() == OSD_OP_DELETE) opcode = PG::Log::Entry::DELETE;
   PG::Log::Entry logentry(opcode, op->get_oid(), version,
-                                                 op->get_orig_asker(), op->get_orig_tid());
+                                                 op->get_client(), op->get_tid());
 
   dout(10) << "prepare_log_transaction " << op->get_op()
                   << (logentry.is_delete() ? " - ":" + ")
@@ -2927,22 +3056,13 @@ void OSD::prepare_log_transaction(ObjectStore::Transaction& t,
                   << " v " << version
                   << " in " << *pg << endl;
 
-  // raise last_complete?
-  if (pg->info.last_complete == pg->log.top)
-       pg->info.last_complete = version;
-  
-  // update pg log
+  // append to log
   assert(version > pg->log.top);
-  assert(pg->info.last_update == pg->log.top);
   pg->log.add(logentry);
   assert(pg->log.top == version);
-  pg->info.last_update = version;
 
-  // write to pg log
+  // write to pg log on disk
   pg->append_log(t, logentry, trim_to);
-  
-  // write pg info
-  t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
 }
 
 
@@ -2960,18 +3080,28 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
                   << " v " << version
                   << " in " << *pg << endl;
   
-  // the op
+  // raise last_complete?
+  if (pg->info.last_complete == pg->info.last_update)
+       pg->info.last_complete = version;
+  
+  // raise last_update.
+  assert(version > pg->info.last_update);
+  pg->info.last_update = version;
+
+  // write pg info
+  t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+
+
+  // apply the op
   switch (op->get_op()) {
   case OSD_OP_WRLOCK:
-  case OSD_OP_REP_WRLOCK:
        { // lock object
          //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
-         t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t));
+         t.setattr(oid, "wrlock", &op->get_client(), sizeof(msg_addr_t));
        }
        break;  
        
   case OSD_OP_WRUNLOCK:
-  case OSD_OP_REP_WRUNLOCK:
        { // unlock objects
          //r = store->rmattr(oid, "wrlock", oncommit);
          t.rmattr(oid, "wrlock");
@@ -2985,7 +3115,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
        break;
        
   case OSD_OP_WRITE:
-  case OSD_OP_REP_WRITE:
        { // write
          assert(op->get_data().length() == op->get_length());
          bufferlist bl;
@@ -2996,7 +3125,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
        break;
        
   case OSD_OP_TRUNCATE:
-  case OSD_OP_REP_TRUNCATE:
        { // truncate
          //r = store->truncate(oid, op->get_offset());
          t.truncate(oid, op->get_length() );
@@ -3004,7 +3132,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
        break;
        
   case OSD_OP_DELETE:
-  case OSD_OP_REP_DELETE:
        { // delete
          //r = store->remove(oid);
          t.remove(oid);
@@ -3016,8 +3143,7 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
   }
   
   // object collection, version
-  if (op->get_op() == OSD_OP_DELETE ||
-         op->get_op() == OSD_OP_REP_DELETE) {
+  if (op->get_op() == OSD_OP_DELETE) {
        // remove object from c
        t.collection_remove(pgid, oid);
   } else {
index 32959d74046164ee85e7d31d4909cf526b1b60ec..03fdfbede52388e8a1bd099ddf4f9d384a3755f7 100644 (file)
@@ -139,7 +139,7 @@ public:
 
   // -- ops --
   class ThreadPool<class OSD*, object_t> *threadpool;
-  hash_map<pg_t, list<MOSDOp*> >         op_queue;
+  hash_map<pg_t, list<Message*> >         op_queue;
   int   pending_ops;
   bool  waiting_for_no_ops;
   Cond  no_pending_ops;
@@ -147,13 +147,13 @@ public:
   
   void wait_for_no_ops();
 
-  void enqueue_op(pg_t pgid, MOSDOp *op);
+  void enqueue_op(pg_t pgid, Message *op);
   void dequeue_op(pg_t pgid);
   static void static_dequeueop(OSD *o, pg_t pgid) {
        o->dequeue_op(pgid);
   };
 
-  void do_op(class MOSDOp *m, PG *pg);  // actually do it
+  void do_op(Message *m, PG *pg);  // actually do it
 
   void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, PG *pg, eversion_t trim_to);
   void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, PG *pg);
@@ -221,11 +221,15 @@ public:
   hash_map<pg_t, list<Message*> >        waiting_for_pg;
 
   // replica ops
-  void get_repop(OSDReplicaOp*);
-  void put_repop(OSDReplicaOp*);   // will send ack/commit msgs, and delete as necessary.
-  void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd);
-  void handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit, int fromosd, 
-                                                eversion_t pg_complete_thru=0);
+  void get_repop_gather(PG::RepOpGather*);
+  void put_repop_gather(PG *pg, PG::RepOpGather*);
+  void issue_repop(PG *pg, MOSDOp *op, int osd);
+  PG::RepOpGather *new_repop_gather(PG *pg, MOSDOp *op);
+  void repop_ack(PG *pg, PG::RepOpGather *repop,
+                                int result, bool commit,
+                                int fromosd, eversion_t pg_complete_thru=0);
+  
+  void handle_rep_op_ack(MOSDOpReply *m);
 
   // recovery
   void do_notifies(map< int, list<PG::Info> >& notify_list);
@@ -233,6 +237,7 @@ public:
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
 
   void pull(PG *pg, object_t, eversion_t);
+  void push(PG *pg, object_t oid, int dest);
 
   bool require_current_map(Message *m, epoch_t v);
   bool require_same_or_newer_map(Message *m, epoch_t e);
@@ -242,11 +247,12 @@ public:
   void handle_pg_log(class MOSDPGLog *m);
   void handle_pg_remove(class MOSDPGRemove *m);
 
-  void op_rep_pull(class MOSDOp *op, PG *pg);
-  void op_rep_pull_reply(class MOSDOpReply *op);
+  void op_pull(class MOSDOp *op, PG *pg);
+  void op_push(class MOSDOp *op, PG *pg);
   
   void op_rep_modify(class MOSDOp *op, PG *pg);   // write, trucnate, delete
-  void op_rep_modify_commit(class MOSDOp *op, eversion_t last_complete);
+  void op_rep_modify_commit(class MOSDOp *op, int ackerosd, 
+                                                       eversion_t last_complete);
   friend class C_OSD_RepModifyCommit;
 
 
@@ -269,7 +275,7 @@ public:
   void op_read(class MOSDOp *m, PG *pg);
   void op_stat(class MOSDOp *m, PG *pg);
   void op_modify(class MOSDOp *m, PG *pg);
-  void op_modify_commit(class OSDReplicaOp *repop, eversion_t last_complete);
+  void op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru);
 
   // for replication
   void handle_op_reply(class MOSDOpReply *m);
index fa5b19ec51c8189a87a8741f490862a193ba3876..97d31944ddc0f545b35705ceabb3fc8631d46ea4 100644 (file)
@@ -49,6 +49,11 @@ using namespace std;
 #define PG_TYPE_RAND     1   // default: distribution randomly
 #define PG_TYPE_STARTOSD 2   // place primary on a specific OSD (named by the pg_bits)
 
+// pg roles
+#define PG_ROLE_STRAY   -1
+#define PG_ROLE_HEAD     0
+#define PG_ROLE_MIDDLE   1
+#define PG_ROLE_TAIL     2
 
 
 
@@ -437,35 +442,49 @@ private:
          return group[0];
        return -1;  // we fail!
   }
+  int get_pg_acting_tail(pg_t pg) {
+       vector<int> group;
+       int nrep = pg_to_acting_osds(pg, group);
+       if (nrep > 0)
+         return group[group.size()-1];
+       return -1;  // we fail!
+  }
 
 
   /* what replica # is a given osd? 0 primary, -1 for none. */
+  int calc_pg_rank(int osd, vector<int>& acting, int nrep=0) {
+       if (!nrep) nrep = acting.size();
+       for (int i=0; i<nrep; i++) 
+         if (acting[i] == osd) return i;
+       return -1;
+  }
+  int calc_pg_role(int osd, vector<int>& acting, int nrep=0) {
+       if (!nrep) nrep = acting.size();
+       int rank = calc_pg_rank(osd, acting, nrep);
+
+       if (rank < 0) return PG_ROLE_STRAY;
+       else if (rank == 0) return PG_ROLE_HEAD;
+       else if (rank == nrep-1) return PG_ROLE_TAIL;
+       else return PG_ROLE_MIDDLE;
+  }
+  
   int get_pg_role(pg_t pg, int osd) {
        vector<int> group;
        int nrep = pg_to_osds(pg, group);
-       for (int i=0; i<nrep; i++) {
-         if (group[i] == osd) return i;
-       }
-       return -1;  // none
+       return calc_pg_role(osd, group, nrep);
   }
 
   /* rank is -1 (stray), 0 (primary), 1,2,3,... (replica) */
   int get_pg_acting_rank(pg_t pg, int osd) {
        vector<int> group;
        int nrep = pg_to_acting_osds(pg, group);
-       for (int i=0; i<nrep; i++) {
-         if (group[i] == osd) return i;
-       }
-       return -1;  // none
+       return calc_pg_rank(osd, group, nrep);
   }
   /* role is -1 (stray), 0 (primary), 1 (replica) */
   int get_pg_acting_role(pg_t pg, int osd) {
        vector<int> group;
        int nrep = pg_to_acting_osds(pg, group);
-       for (int i=0; i<nrep; i++) {
-         if (group[i] == osd) return i>0 ? 1:0;
-       }
-       return -1;  // none
+       return calc_pg_role(osd, group, nrep);
   }
 
 
index b9bb4853aeb9afc2283af0df0b8ce099ebc86d34..1fd065ef0d9c5fa171c9bf7e886a687268f5f814 100644 (file)
@@ -69,6 +69,31 @@ void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s)
 }
 
 
+void PG::IndexedLog::trim_write_ahead(eversion_t last_update) 
+{
+  while (!log.empty() &&
+                log.rbegin()->version > last_update) {
+       // remove from index
+       unindex(*log.rbegin());
+       
+       // remove
+       log.pop_back();
+  }
+}
+
+void PG::trim_write_ahead()
+{
+  if (info.last_update < log.top) {
+       dout(10) << "trim_write_ahead (" << info.last_update << "," << log.top << "]" << endl;
+       log.trim_write_ahead(info.last_update);
+  } else {
+       assert(info.last_update == log.top);
+       dout(10) << "trim_write_ahead last_update=top=" << info.last_update << endl;
+  }
+
+}
+
+
 void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
 {
   dout(10) << "merge_log " << olog << " from osd" << fromosd
@@ -366,7 +391,7 @@ void PG::peer(ObjectStore::Transaction& t,
        }
        
        dout(10) << " querying info from osd" << *it << endl;
-       query_map[*it][info.pgid] = Query(Query::INFO, info.same_primary_since);
+       query_map[*it][info.pgid] = Query(Query::INFO, info.same_primary_since, info.same_acker_since);
        peer_info_requested.insert(*it);
   }
   if (missing_info) return;
@@ -454,7 +479,8 @@ void PG::peer(ObjectStore::Transaction& t,
                                 << " v " << newest_update 
                                 << ", querying since " << oldest_update_needed
                                 << endl;
-               query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, info.same_primary_since);
+               query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, 
+                                                                                                               info.same_primary_since, info.same_acker_since);
                peer_log_requested[newest_update_osd] = oldest_update_needed;
          } else {
                dout(10) << " newest update on osd" << newest_update_osd
@@ -464,7 +490,8 @@ void PG::peer(ObjectStore::Transaction& t,
                assert((peer_info[newest_update_osd].last_complete >= 
                                peer_info[newest_update_osd].log_bottom) ||
                           peer_info[newest_update_osd].log_backlog);  // or else we're in trouble.
-               query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, info.same_primary_since);
+               query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, 
+                                                                                                               info.same_primary_since, info.same_acker_since);
                peer_summary_requested.insert(newest_update_osd);
          }
        }
@@ -505,7 +532,8 @@ void PG::peer(ObjectStore::Transaction& t,
                           << ".  fetching summary/backlog from osd" << who
                           << endl;
          assert(who != osd->whoami); // can't be me, or we're in trouble.
-         query_map[who][info.pgid] = Query(Query::BACKLOG, info.same_primary_since);
+         query_map[who][info.pgid] = Query(Query::BACKLOG, 
+                                                                               info.same_primary_since, info.same_acker_since);
          peer_summary_requested.insert(who);
        }
        return;
@@ -529,7 +557,9 @@ void PG::peer(ObjectStore::Transaction& t,
          }
 
          dout(10) << " requesting summary/backlog from osd" << peer << endl;     
-         query_map[peer][info.pgid] = Query(Query::INFO, info.same_primary_since);
+         query_map[peer][info.pgid] = Query(Query::INFO, 
+                                                                                info.same_primary_since,
+                                                                                info.same_acker_since);
          peer_summary_requested.insert(peer);
          waiting = true;
        }
@@ -598,9 +628,10 @@ void PG::activate(ObjectStore::Transaction& t)
        dout(10) << "activate - complete" << endl;
        log.complete_to == log.log.end();
        log.requested_to = log.log.end();
-  } else {
-       dout(10) << "activate - not complete, starting recovery" << endl;
-
+  } 
+  else if (is_primary()) {
+       dout(10) << "activate - not complete, " << missing << ", starting recovery" << endl;
+       
        // init complete_to
        log.complete_to = log.log.begin();
        while (log.complete_to->version < info.last_complete) {
@@ -611,6 +642,8 @@ void PG::activate(ObjectStore::Transaction& t)
        // start recovery
        log.requested_to = log.complete_to;
     do_recovery();
+  } else {
+       dout(10) << "activate - not complete, " << missing << endl;
   }
 
 
@@ -649,24 +682,6 @@ void PG::activate(ObjectStore::Transaction& t)
                m->log.copy_after(log, peer_info[peer].last_update);
          }
          
-         // build missing list for them too
-         set<object_t> did;
-         for (list<Log::Entry>::reverse_iterator p = m->log.log.rbegin();
-                  p != m->log.log.rend();
-                  p++) {
-               if (p->is_delete()) continue;
-               if (did.count(p->oid)) continue;
-               did.insert(p->oid);
-               
-               if (missing.is_missing(p->oid, p->version)) {   // we don't have it?
-                 assert(missing.loc.count(p->oid));   // nothing should be lost!
-                 m->missing.add(p->oid, p->version);
-                 m->missing.loc[p->oid] = missing.loc[p->oid];
-               } 
-               // note: peer will assume we have it if we don't say otherwise.
-               // else m->missing.loc[oid] = osd->whoami;       // we have it.
-         }
-         
          dout(10) << "sending " << m->log << " " << m->missing
                           << " to osd" << peer << endl;
          
@@ -860,6 +875,35 @@ bool PG::do_recovery()
   return false;
 }
 
+void PG::do_peer_recovery()
+{
+  dout(10) << "do_peer_recovery" << endl;
+
+  for (unsigned i=0; i<acting.size(); i++) {
+       int peer = acting[i];
+       if (peer_missing.count(peer) == 0 ||
+               peer_missing[peer].num_missing() == 0) 
+         continue;
+       
+       // oldest first!
+       object_t oid = peer_missing[peer].rmissing.begin()->second;
+       eversion_t v = peer_missing[peer].rmissing.begin()->first;
+
+       osd->push(this, oid, peer);
+
+       // do other peers need it too?
+       for (i++; i<acting.size(); i++) {
+         int peer = acting[i];
+         if (peer_missing.count(peer) &&
+                 peer_missing[peer].is_missing(oid))
+               osd->push(this, oid, peer);
+       }
+
+       return;
+  }
+  
+  // nothing to do!
+}
 
 
 
index b7dcff9f46ffeacdbf5e1bce863f7f835ab7b6bf..6c5c1f9cbf14a79c68aee2f8a2595a13a7046713 100644 (file)
@@ -21,6 +21,7 @@
 #include "OSDMap.h"
 #include "ObjectStore.h"
 #include "msg/Messenger.h"
+#include "messages/MOSDOpReply.h"
 
 #include "include/types.h"
 
@@ -81,19 +82,23 @@ public:
    */
   struct Info {
        pg_t pgid;
-       eversion_t last_update;    // (aka log.top) last object version logged/updated.
+       eversion_t last_update;    // last object version applied to store.
        eversion_t last_complete;  // last version pg was complete through.
+
        eversion_t log_bottom;     // oldest log entry.
-       bool      log_backlog;    // do we store a complete log?
+       bool       log_backlog;    // do we store a complete log?
+
        epoch_t last_epoch_started;  // last epoch started.
        epoch_t last_epoch_finished; // last epoch finished.
        epoch_t same_primary_since;  // upper bound: same primary at least back through this epoch.
+       epoch_t same_acker_since;    // upper bound: same acker at least back through this epoch.
        epoch_t same_role_since;     // upper bound: i have held same role since
+
        Info(pg_t p=0) : pgid(p), 
-                                          //last_update(0), last_complete(0), log_bottom(0), 
-                                          log_backlog(false),
-                                          last_epoch_started(0), last_epoch_finished(0),
-                                          same_primary_since(0), same_role_since(0) {}
+                                        log_backlog(false),
+                                        last_epoch_started(0), last_epoch_finished(0),
+                                        same_primary_since(0), same_acker_since(0), 
+                                        same_role_since(0) {}
        bool is_clean() { return last_update == last_complete; }
   };
 
@@ -112,10 +117,15 @@ public:
        int type;
        eversion_t version;
        epoch_t same_primary_since;
-
-       Query() : type(-1), same_primary_since(0) {}
-       Query(int t, epoch_t s) : type(t), same_primary_since(s) {}
-       Query(int t, eversion_t v, epoch_t s) : type(t), version(v), same_primary_since(s) {}
+       epoch_t same_acker_since;
+
+       Query() : type(-1), same_primary_since(0), same_acker_since(0) {}
+       Query(int t, epoch_t ps, epoch_t as) : 
+         type(t), 
+         same_primary_since(ps), same_acker_since(as) {}
+       Query(int t, eversion_t v, epoch_t ps, epoch_t as) : 
+         type(t), version(v), 
+         same_primary_since(ps), same_acker_since(as) {}
   };
   
   
@@ -162,6 +172,12 @@ public:
          rmissing.erase(missing[oid]);
          missing.erase(oid);
        }
+       void got(object_t oid) {
+         assert(missing.count(oid));
+         loc.erase(oid);
+         rmissing.erase(missing[oid]);
+         missing.erase(oid);
+       }
 
        void _encode(bufferlist& blist) {
          ::_encode(missing, blist);
@@ -329,6 +345,7 @@ public:
        }
 
        void trim(ObjectStore::Transaction &t, eversion_t s);
+       void trim_write_ahead(eversion_t last_update);
   };
   
 
@@ -348,6 +365,46 @@ public:
   };
 
 
+  /***
+   */
+
+  class RepOpGather {
+  public:
+       class MOSDOp *op;
+       tid_t rep_tid;
+
+       set<int>  waitfor_ack;
+       set<int>  waitfor_commit;
+       
+       utime_t   start;
+
+       bool sent_ack, sent_commit;
+       
+       set<int>         osds;
+       eversion_t       new_version;
+
+       eversion_t       pg_local_last_complete;
+       map<int,eversion_t> pg_complete_thru;
+       
+       RepOpGather(MOSDOp *o, tid_t rt, eversion_t nv, eversion_t lc) :
+         op(o), rep_tid(rt),
+         sent_ack(false), sent_commit(false),
+         new_version(nv), 
+         pg_local_last_complete(lc) { }
+       bool can_send_ack() { 
+         return !sent_ack && !sent_commit &&
+               waitfor_ack.empty(); 
+       }
+       bool can_send_commit() { 
+         return !sent_commit &&
+               waitfor_ack.empty() && waitfor_commit.empty(); 
+       }
+       bool can_delete() { 
+         return waitfor_ack.empty() && waitfor_commit.empty(); 
+       }
+  };
+
+
   /*** PG ****/
 public:
   // any
@@ -396,10 +453,17 @@ protected:
   set<int>             peer_summary_requested;
   friend class OSD;
 
+
+  // [primary|tail]
+  // old way
   map<tid_t, class OSDReplicaOp*> replica_ops;
   map<int, set<tid_t> >           replica_tids_by_osd; // osd -> (tid,...)
 
+  // new way
+  map<tid_t, RepOpGather*>          repop_gather;
+  map<tid_t, list<class Message*> > waiting_for_repop;
 
+  
   // [primary|replica]
   // pg waiters
   list<class Message*>            waiting_for_active;
@@ -411,19 +475,22 @@ protected:
   map<object_t, eversion_t> objects_pulling;  // which objects are currently being pulled
   
 public:
-  void clear_primary_recovery_state() {
-       peer_info.clear();
-       peer_missing.clear();
-  }
   void clear_primary_state() {
+       // clear peering state
        prior_set.clear();
        stray_set.clear();
        clean_set.clear();
        peer_info_requested.clear();
        peer_log_requested.clear();
-       clear_primary_recovery_state();
+       peer_info.clear();
+       peer_missing.clear();
+  }
+
+  void clear_acker_state() {
+       
   }
 
+
  public:
   bool is_acting(int osd) const { 
        for (unsigned i=0; i<acting.size(); i++)
@@ -453,6 +520,8 @@ public:
   void merge_log(Log &olog, Missing& omissing, int from);
   void generate_backlog();
   void drop_backlog();
+  
+  void trim_write_ahead();
 
   void peer(ObjectStore::Transaction& t, map< int, map<pg_t,Query> >& query_map);
 
@@ -460,6 +529,7 @@ public:
 
   void cancel_recovery();
   bool do_recovery();
+  void do_peer_recovery();
 
   void clean_replicas();
 
@@ -479,18 +549,20 @@ public:
   { }
   
   pg_t       get_pgid() const { return info.pgid; }
-  int        get_primary() { return acting[0]; }
   int        get_nrep() const { return acting.size(); }
 
+  int        get_primary() { return acting.empty() ? -1:acting[0]; }
+  int        get_tail() { return acting.empty() ? -1:acting[ acting.size()-1 ]; }
+  int        get_acker() { return g_conf.osd_rep == OSD_REP_PRIMARY ? get_primary():get_tail(); }
+  
   int        get_role() const { return role; }
   void       set_role(int r) { role = r; }
-  void       calc_role(int whoami) {
-       role = -1;
-       for (unsigned i=0; i<acting.size(); i++)
-         if (acting[i] == whoami) role = i>0 ? 1:0;
-  }
-  bool       is_primary() const { return role == 0; }
-  bool       is_residual() const { return role < 0; }
+
+  bool       is_primary() const { return role == PG_ROLE_HEAD; }
+  bool       is_head() const { return role == PG_ROLE_HEAD; }
+  bool       is_tail() const { return role == PG_ROLE_TAIL; }
+  bool       is_middle() const { return role == PG_ROLE_MIDDLE; }
+  bool       is_residual() const { return role == PG_ROLE_STRAY; }
   
   //int  get_state() const { return state; }
   bool state_test(int m) const { return (state & m) != 0; }
@@ -580,4 +652,17 @@ inline ostream& operator<<(ostream& out, const PG& pg)
 }
 
 
+inline ostream& operator<<(ostream& out, PG::RepOpGather& repop)
+{
+  out << "repop(rep_tid=" << repop.rep_tid 
+         << " wfack=" << repop.waitfor_ack
+         << " wfcommit=" << repop.waitfor_commit;
+  out << " pct=" << repop.pg_complete_thru;
+  out << " op=" << *(repop.op);
+  out << " repop=" << &repop;
+  out << ")";
+  return out;
+}
+
+
 #endif
index 25d4d4e58a50bb017151e49809f4d1e0d5daf21d..08860bd7a973654b86b6d09f9d39ffb904bd4a33 100644 (file)
@@ -108,33 +108,42 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs)
        pg_t pgid = i->first;
        PG& pg = i->second;
 
-       int old = pg.primary;
-       pg.calc_primary(pgid, osdmap);
-
-       if (old != pg.primary) {
-         if (old < 0) {
+       int oldu = pg.updater;
+       int oldr = pg.reader;
+       pg.calc(pgid, osdmap);
+
+       if (oldu != pg.updater ||
+               oldr != pg.reader) {
+         /*
+         if (oldu < 0) {
                dout(10) << "scan_pgs pg " << hex << pgid << dec 
                                 << " (" << pg.active_tids << ")"
-                                << " primary " << old << " -> " << pg.primary
+                                << " updater " << oldu << " -> " << pg.updater
                                 << " (was crashed)"
                                 << endl;
                //recovering_pgs.insert(pgid);
          }
-         else if (osdmap->is_down(old)) {
+         else if (osdmap->is_down(oldu)) {
                dout(10) << "scan_pgs pg " << hex << pgid << dec 
                                 << " (" << pg.active_tids << ")"
-                                << " primary " << old << " -> " << pg.primary
-                                << " (primary went down)"
+                                << " updater " << oldu << " -> " << pg.updater
+                                << " (updater went down)"
                                 << endl;
                //down_pgs.insert(pgid);
          } 
          else {
                dout(10) << "scan_pgs pg " << hex << pgid << dec 
                                 << " (" << pg.active_tids << ")"
-                                << " primary " << old << " -> " << pg.primary
+                                << " updater " << oldu << " -> " << pg.updater
                                 << " (primary changed)"
                                 << endl;
          }
+         */
+         dout(10) << "scan_pgs pg " << hex << pgid << dec 
+                          << " (" << pg.active_tids << ")"
+                          << " updater " << oldu << " -> " << pg.updater
+                          << ", reader " << oldr << " -> " << pg.reader
+                          << endl;
          changed_pgs.insert(pgid);
        }
   }
@@ -265,11 +274,11 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
                   << " oid " << hex << ex.oid << dec  << " " << ex.start << "~" << ex.length
                   << " (" << ex.buffer_extents.size() << " buffer fragments)" 
                   << " pg " << hex << ex.pgid << dec
-                  << " osd" << pg.primary 
+                  << " osd" << pg.reader 
                   << endl;
 
-  if (pg.primary >= 0) 
-       messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
+  if (pg.reader >= 0) 
+       messenger->send_message(m, MSG_ADDR_OSD(pg.reader), 0);
        
   // add to gather set
   rd->ops[last_tid] = ex;
@@ -557,10 +566,10 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
                   << "  oid " << hex << ex.oid << dec 
                   << " " << ex.start << "~" << ex.length 
                   << " pg " << hex << ex.pgid << dec 
-                  << " osd" << pg.primary 
+                  << " osd" << pg.updater 
                   << endl;
-  if (pg.primary >= 0)
-       messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
+  if (pg.updater >= 0)
+       messenger->send_message(m, MSG_ADDR_OSD(pg.updater), 0);
   
   return tid;
 }
@@ -592,8 +601,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
   PG &pg = get_pg( m->get_pg() );
 
   // ignore?
-  if (pg.primary != m->get_source().num()) {
-       dout(7) << " ignoring ack|commit from non-primary" << endl;
+  if (pg.reader != m->get_source().num()) {
+       dout(7) << " ignoring ack|commit from non-acker" << endl;
        delete m;
        return;
   }
index 309b0911690d69168553a16c011af091098e0ee2..902d270ed7fdc34d31aa58ec8ba7e3d676a264df 100644 (file)
@@ -4,6 +4,7 @@
 #include "include/types.h"
 #include "include/bufferlist.h"
 
+#include "osd/OSDMap.h"
 #include "messages/MOSDOp.h"
 
 #include <list>
@@ -88,13 +89,18 @@ class Objecter {
    */
   class PG {
   public:
-       int primary;             // current osd set
+       int updater;         // where i write
+       int reader;          // where i read, and expect acks from
        set<tid_t>  active_tids; // active ops
 
-       PG() : primary(-1) {}
+       PG() : updater(-1), reader(-1) {}
 
-       void calc_primary(pg_t pgid, OSDMap *osdmap) {  // return true if change
-         primary = osdmap->get_pg_acting_primary(pgid);
+       void calc(pg_t pgid, OSDMap *osdmap) {  // return true if change
+         updater = osdmap->get_pg_acting_primary(pgid);
+         if (g_conf.osd_rep == OSD_REP_PRIMARY)
+               reader = updater;
+         else 
+               reader = osdmap->get_pg_acting_tail(pgid);
        }
   };
 
@@ -103,7 +109,7 @@ class Objecter {
   
   PG &get_pg(pg_t pgid) {
        if (!pg_map.count(pgid)) 
-         pg_map[pgid].calc_primary(pgid, osdmap);
+         pg_map[pgid].calc(pgid, osdmap);
        return pg_map[pgid];
   }
   void close_pg(pg_t pgid) {