]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 13 Aug 2005 02:55:13 +0000 (02:55 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 13 Aug 2005 02:55:13 +0000 (02:55 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@488 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
ceph/Makefile
ceph/client/Client.cc
ceph/config.cc
ceph/config.h
ceph/include/buffer.h
ceph/include/types.h
ceph/mds/MDS.cc
ceph/messages/MOSDOp.h
ceph/messages/MOSDOpReply.h
ceph/osd/FakeStore.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osd/RG.cc [new file with mode: 0644]
ceph/osd/RG.h [new file with mode: 0644]

index 64ffe0af7b94ff59a8eed176ce06babbe6b49a82..dce8985adc3b3ab5c98b468c0755b32749f0b312 100644 (file)
@@ -45,6 +45,7 @@ COMMON_OBJS= \
        osd/FakeStore.o\
        osd/Filer.o\
        osd/OSDMap.o\
+       osd/RG.o\
        osd/rush.o\
        common/Logger.o\
        common/Clock.o\
index cfaf13e6b4c07fe63cb0d0230f1c1adac2345f2f..75539c060c4098d1005bca4a1d65194f28bf11c6 100644 (file)
@@ -42,8 +42,6 @@ Client::Client(Messenger *m)
 {
   // which client am i?
   whoami = MSG_ADDR_NUM(m->get_myaddr());
-  cout << "i am client " << whoami <<  " " << MSG_ADDR_NICE(m->get_myaddr()) << endl;
-
 
   mounted = false;
 
index 48f2743571a63063799a081862227476c7e7c080..1aaca7ba49235843193c42fe1273908ee5953f2b 100644 (file)
@@ -47,6 +47,7 @@ md_config_t g_conf = {
 
   fake_clock: false,
   fakemessenger_serialize: true,
+  fake_osdmap_expand: false,
 
   debug: 1,
   debug_mds_balancer: 1,
@@ -180,6 +181,9 @@ void parse_config_options(vector<char*>& args)
        else if (strcmp(args[i], "--numosd") == 0) 
          g_conf.num_osd = atoi(args[++i]);
 
+       else if (strcmp(args[i], "--fake_osdmap_expand") == 0) 
+         g_conf.fake_osdmap_expand = atoi(args[++i]);
+
        else if (strcmp(args[i], "--debug") == 0) 
          g_conf.debug = atoi(args[++i]);
        else if (strcmp(args[i], "--debug_mds_balancer") == 0) 
index d7cec57097a858f8db700de8f0f6aaf5635eda57..7a0c445404dd89cb6e2aa00d7e9ee1db0e1ca62d 100644 (file)
@@ -25,6 +25,8 @@ struct md_config_t {
   bool fake_clock;
   bool fakemessenger_serialize;
 
+  bool fake_osdmap_expand;
+
   int debug;
   int debug_mds_balancer;
   int debug_mds_log;
@@ -74,6 +76,7 @@ struct md_config_t {
   bool  mds_commit_on_shutdown;
   bool  mds_verify_export_dirauth;     // debug flag
 
+
   // osd
   int   osd_num_rg;
   bool  osd_fsync;
index b3b5619e23bea56019c4f9e51069c56956ea4c84..85fd2a9b6d1e5ecd173eff78ef213930961a6c41 100644 (file)
@@ -193,12 +193,12 @@ class bufferptr {
        _buffer(other._buffer),
        _len(other._len),
        _off(other._off) {
-       _buffer->_get();        
+       if (_buffer) _buffer->_get();   
   }
 
   // assignment operator
   bufferptr& operator=(const bufferptr& other) {
-       assert(0);
+       //assert(0);
        // discard old
        discard_buffer();
 
@@ -206,7 +206,8 @@ class bufferptr {
        _buffer = other._buffer;
        _len = other._len;
        _off = other._off;
-       _buffer->_get();
+       if (_buffer) _buffer->_get();
+       return *this;
   }
 
   ~bufferptr() {
index 701d4b71c98d3bcf526bfee48b43bd12f5acec2b..d538b49f5e157143f8aada29421847c8cc9ec4bd 100644 (file)
@@ -130,6 +130,8 @@ struct FileLayout {
 
 // -- inode --
 
+typedef __uint64_t version_t;
+
 typedef __uint64_t inodeno_t;   // ino
 
 #define INODE_MODE_FILE     0100000 // S_IFREG
@@ -147,20 +149,20 @@ struct inode_t {
   time_t    ctime;
 
   // hard (permissions)
-  mode_t mode;
-  uid_t  uid;
-  gid_t  gid;
+  mode_t     mode;
+  uid_t      uid;
+  gid_t      gid;
+  FileLayout layout;  
 
   // soft
   __uint64_t size;
   time_t     atime, mtime;      // maybe atime different?  "lazy"?
+  int        nlink;
 
   // special stuff
-  FileLayout    layout;  
-  unsigned char hash_seed;  // 0 if not hashed.
-  int           nlink;
-  bool          anchored;
-  __uint64_t    file_data_version;
+  unsigned char hash_seed;         // only defined for dir; 0 if not hashed.
+  bool          anchored;          // auth only
+  version_t     file_data_version; // auth only
 };
 
 
@@ -171,6 +173,16 @@ typedef __uint64_t coll_t;        // collection id
 
 #define RG_NONE    0xffffffffffffffffLL
 
+struct onode_t {
+  object_t    oid;
+  repgroup_t  rgid;
+  version_t   version;
+  size_t      size;
+  //time_t      ctime, mtime;
+};
+
+
+
 // client types
 typedef int        fh_t;          // file handle 
 
index aeca08e77ac1d88b72a2b7523e3c970899cd9ae7..35d296f04b732963224704fb2e093af0e041cf29 100644 (file)
@@ -546,7 +546,7 @@ void MDS::my_dispatch(Message *m)
 
 
        // HACK osd map change
-       if (false) {
+       if (g_conf.fake_osdmap_expand) {
          static int didit = 0;
          if (whoami == 0 && 
                  elapsed.sec() > 10 && !didit &&
index 5efe820ec6d66e283928326e8ca77ae7babc9846..34d8cecc202009ea60891d4a44cbcc566dd0cff3 100644 (file)
@@ -9,18 +9,23 @@
  * oid - object id
  * op  - OSD_OP_DELETE, etc.
  *
- * rg_role  - who we want ... 0 == primary, this is what clients/mds will do.
- * rg_nrep  - how many replicas we want ... just for writes currently?
- *
  */
 
+#define OSD_OP_MKFS       20
+
+// client ops
 #define OSD_OP_READ       1
 #define OSD_OP_WRITE      2
 #define OSD_OP_STAT       10
 #define OSD_OP_DELETE     11
 #define OSD_OP_TRUNCATE   12
 #define OSD_OP_ZERORANGE  13
-#define OSD_OP_MKFS       20
+
+// replication/recovery -- these ops are relative to a specific object version #
+#define OSD_OP_REP_PULL    30   // whole object read
+#define OSD_OP_REP_PUSH    31   // whole object write
+#define OSD_OP_REP_REMOVE  32   // delete replica
+#define OSD_OP_REP_WRITE   33   // replicated (partial object) write
 
 #define OSD_OP_FLAG_TRUNCATE  1   // truncate object after end of write
 
@@ -32,10 +37,11 @@ typedef struct {
   object_t oid;
   repgroup_t rg;
   int rg_role;//, rg_nrep;
-  __uint64_t map_version;
+  version_t map_version;
 
   int op;
   size_t length, offset;
+  version_t version;
 
   size_t _data_len;
 } MOSDOp_st;
@@ -47,16 +53,17 @@ class MOSDOp : public Message {
   friend class MOSDOpReply;
 
  public:
-  long get_tid() { return st.tid; }
+  long       get_tid() { return st.tid; }
   msg_addr_t get_asker() { return st.asker; }
 
-  object_t get_oid() { return st.oid; }
+  object_t   get_oid() { return st.oid; }
   repgroup_t get_rg() { return st.rg; }
+  version_t  get_map_version() { return st.map_version; }
+
   int        get_rg_role() { return st.rg_role; }  // who am i asking for?
-  //int        get_rg_nrep() { return st.rg_nrep; }
-  __uint64_t get_map_version() { return st.map_version; }
+  version_t  get_version() { return st.version; }
 
-  int get_op() { return st.op; }
+  int    get_op() { return st.op; }
   size_t get_length() { return st.length; }
   size_t get_offset() { return st.offset; }
 
@@ -74,7 +81,7 @@ class MOSDOp : public Message {
   long get_pcid() { return st.pcid; }
 
   MOSDOp(long tid, msg_addr_t asker, 
-                object_t oid, repgroup_t rg, __uint64_t mapversion, int op) :
+                object_t oid, repgroup_t rg, version_t mapversion, int op) :
        Message(MSG_OSD_OP) {
        memset(&st, 0, sizeof(st));
        this->st.tid = tid;
@@ -93,7 +100,7 @@ class MOSDOp : public Message {
 
   void set_length(size_t l) { st.length = l; }
   void set_offset(size_t o) { st.offset = o; }
-
+  void set_version(version_t v) { st.version = v; }
   
   // marshalling
   virtual void decode_payload() {
index e48919228a10f24f08c4f06aab3e66b979ea3ee2..40ae0748ec7c4f82eabcdf0c21baada585082994 100644 (file)
@@ -28,6 +28,7 @@ typedef struct {
   int    result;
   size_t length, offset;
   size_t object_size;
+  version_t version;
 
   __uint64_t _new_map_version;
   size_t _data_len, _oc_len;
@@ -48,6 +49,7 @@ class MOSDOpReply : public Message {
   size_t get_length() { return st.length; }
   size_t get_offset() { return st.offset; }
   size_t get_object_size() { return st.object_size; }
+  version_t get_version() { return st.version; }
 
   void set_result(int r) { st.result = r; }
   void set_length(size_t s) { st.length = s; }
@@ -85,6 +87,7 @@ class MOSDOpReply : public Message {
 
        this->st.length = req->st.length;   // speculative... OSD should ensure these are correct
        this->st.offset = req->st.offset;
+       this->st.version = req->st.version;
 
        // attach updated cluster spec?
        if (oc &&
index 0d7036b10957bb8b12a7d644be9809700f5b8294..95f6346d3de569e89c55328e9b9a6e8feaa7196d 100644 (file)
 #undef dout
 #define  dout(l)    if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
 
+#include "include/bufferlist.h"
+
+#include <map>
+#include <ext/hash_map>
+using namespace __gnu_cxx;
 
 // crap-a-crap hash
 #define HASH_DIRS       128LL
@@ -26,6 +31,8 @@
 // end crap hash
 
 
+map<int, hash_map<object_t, map<const char*, bufferptr> > > fakeattrs;
+
 
 FakeStore::FakeStore(char *base, int whoami) 
 {
@@ -268,18 +275,43 @@ int FakeStore::write(object_t oid,
 int FakeStore::setattr(object_t oid, const char *name,
                                           void *value, size_t size)
 {
-  string fn;
-  get_oname(oid, fn);
-  return setxattr(fn.c_str(), name, value, size, 0);
+  if (1) {
+       bufferptr bp(new buffer((char*)value,size));
+       fakeattrs[whoami][oid][name] = bp;
+       return 0;
+  } else {
+       string fn;
+       get_oname(oid, fn);
+       int r = setxattr(fn.c_str(), name, value, size, 0);
+       if (r == -1) 
+         cerr << " errno is " << errno << " " << strerror(errno) << endl;
+       assert(r == 0);
+       return r;
+  }
 }
 
 
 int FakeStore::getattr(object_t oid, const char *name,
                                           void *value, size_t size)
 {
-  string fn;
-  get_oname(oid, fn);
-  return getxattr(fn.c_str(), name, value, size);
+  if (1) {
+       if (fakeattrs[whoami][oid].count(name)) {
+         size_t l = fakeattrs[whoami][oid][name].length();
+         if (l > size) l = size;
+         bufferlist bl;
+         bl.append(fakeattrs[whoami][oid][name]);
+         bl.copy(0, l, (char*)value);
+         return l;
+       } else {
+         return -1;
+       }
+  } else {
+       string fn;
+       get_oname(oid, fn);
+       int r = getxattr(fn.c_str(), name, value, size);
+       //      assert(r == 0);
+       return r;
+  }
 }
 
 int FakeStore::listattr(object_t oid, char *attrs, size_t size)
index 9fbc81cc23ff7c341559f34015bab3a3923d71f0..201fea241c0fbd6ba7e12597fd9dc5621e0fa5ab 100644 (file)
@@ -63,6 +63,12 @@ OSD::OSD(int id, Messenger *m)
 
   last_tid = 0;
 
+  max_recovery_ops = 5;
+
+  pending_ops = 0;
+  waiting_for_no_ops = false;
+
+
   // use fake store
 #ifdef USE_OBFS
   store = new OBFSStore(whoami, NULL, "/dev/sdb3");
@@ -150,8 +156,44 @@ int OSD::shutdown()
 
 
 
+// object locks
 
+void OSD::lock_object(object_t oid) 
+{
+  osd_lock.Lock();
+  if (object_lock.count(oid)) {
+       Cond c;
+       dout(7) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl;
+       object_lock_waiters[oid].push_back(&c);
+       c.Wait(osd_lock);
+       assert(object_lock.count(oid));
+  } else {
+       dout(7) << "lock_object " << hex << oid << dec << endl;
+       object_lock.insert(oid);
+  }
+  osd_lock.Unlock();
+}
 
+void OSD::unlock_object(object_t oid) 
+{
+  osd_lock.Lock();
+  assert(object_lock.count(oid));
+  if (object_lock_waiters.count(oid)) {
+       // someone is in line
+       list<Cond*>& ls = object_lock_waiters[oid];
+       Cond *c = ls.front();
+       dout(7) << "unlock_object " << hex << oid << dec << " waking up next guy " << c << endl;
+       ls.pop_front();
+       if (ls.empty()) 
+         object_lock_waiters.erase(oid);
+       c->Signal();
+  } else {
+       // nobody waiting
+       dout(7) << "unlock_object " << hex << oid << dec << endl;
+       object_lock.erase(oid);
+  }
+  osd_lock.Unlock();
+}
 
 
 // --------------------------------------
@@ -213,20 +255,52 @@ void OSD::dispatch(Message *m)
        dout(1) << " got unknown message " << m->get_type() << endl;
        assert(0);
   }
+
+  if (!finished.empty()) {
+       list<Message*> waiting;
+       waiting.splice(waiting.begin(), finished);
+       for (list<Message*>::iterator it = waiting.begin();
+                it != waiting.end();
+                it++) {
+         dispatch(*it);
+       }
+  }
+
 }
 
 
 void OSD::handle_op_reply(MOSDOpReply *m)
 {
-  replica_write_lock.Lock();
-  MOSDOp *op = replica_writes[m->get_tid()];
-  dout(7) << "got replica write ack tid " << m->get_tid() << " orig op " << op << endl;
+  switch (m->get_op()) {
+  case OSD_OP_REP_PULL:
+       op_rep_pull_reply(m);
+       break;
+  case OSD_OP_REP_PUSH:
+       op_rep_push_reply(m);
+       break;
+  case OSD_OP_REP_REMOVE:
+       op_rep_remove_reply(m);
+       break;
 
-  replica_write_tids[op].erase(m->get_tid());
-  if (replica_write_tids[op].empty())
-       replica_write_cond[op]->Signal();
+  case OSD_OP_REP_WRITE:
+       { // oldcrap
+         /*
+         replica_write_lock.Lock();
+         MOSDOp *op = replica_writes[m->get_tid()];
+         dout(7) << "got replica write ack tid " << m->get_tid() << " orig op " << op << endl;
+         
+         replica_write_tids[op].erase(m->get_tid());
+         if (replica_write_tids[op].empty())
+               replica_write_cond[op]->Signal();
+         
+         replica_write_lock.Unlock();
+         */
+       }
+       break;
 
-  replica_write_lock.Unlock();
+  default:
+       assert(0);
+  }
 }
 
 
@@ -279,8 +353,10 @@ void OSD::update_map(bufferlist& state)
 
 void OSD::handle_osd_map(MOSDMap *m)
 {
-  // SAB
-  osd_lock.Lock();
+  // wait for ops to finish
+  wait_for_no_ops();
+
+  osd_lock.Lock();     // actually, don't need this if we finish all ops?
 
   if (!osdmap ||
          m->get_version() > osdmap->get_version()) {
@@ -294,20 +370,13 @@ void OSD::handle_osd_map(MOSDMap *m)
        delete m;
 
        // process waiters
-       list<Message*> waiting;
-       waiting.splice(waiting.begin(), waiting_for_osdmap);
+       take_waiters(waiting_for_osdmap);
 
-       osd_lock.Unlock();
-       
-       for (list<Message*>::iterator it = waiting.begin();
-                it != waiting.end();
-                it++) {
-         dispatch(*it);
-       }
   } else {
        dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
-       osd_lock.Unlock();
   }
+  
+  osd_lock.Unlock();
 }
 
 
@@ -326,36 +395,44 @@ void OSD::get_rg_list(list<repgroup_t>& ls)
 }
 
 
-bool OSD::rg_exists(repgroup_t rg) 
+bool OSD::rg_exists(repgroup_t rgid
 {
   struct stat st;
-  if (store->collection_stat(rg, &st) == 0) 
+  if (store->collection_stat(rgid, &st) == 0) 
        return true;
   else
        return false;
 }
 
 
-RG *OSD::open_rg(repgroup_t rg)
+RG *OSD::open_rg(repgroup_t rgid)
 {
   // already open?
-  if (rg_map.count(rg)) 
-       return rg_map[rg];
-
-  // stat collection
-  RG *r = new RG(rg);
-  if (rg_exists(rg)) {
-       // exists
-       r->fetch(store);
-  } else {
-       // dne
-       r->store(store);
-  }
-  rg_map[rg] = r;
+  if (rg_map.count(rgid)) 
+       return rg_map[rgid];
 
-  return r;
+  // exists?
+  if (!rg_exists(rgid))
+       return 0;
+
+  // open, stat collection
+  RG *rg = new RG(whoami, rgid);
+  rg->fetch(store);
+  rg_map[rgid] = rg;
+
+  return rg;
 }
  
+RG *OSD::new_rg(repgroup_t rgid)
+{
+  assert(rg_map.count(rgid) == 0);
+  assert(!rg_exists(rgid));
+
+  RG *rg = new RG(whoami, rgid);
+  rg->store(store);
+  rg_map[rgid] = rg;
+  return rg;
+}
 
 
 
@@ -387,14 +464,13 @@ void OSD::scan_rg()
        int nrep = osdmap->repgroup_to_acting_osds(rgid, acting);
        assert(nrep > 0);
        int primary = acting[0];
-       int role = -1;
+       int role = -1;        // -1, 0, 1
        for (int i=0; i<nrep; i++) 
-         if (acting[i] == whoami) role = i;
+         if (acting[i] == whoami) role = i>0 ? 1:0;
        
-
        if (role != rg->get_role()) {
          // role change.
-         dout(10) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; 
+         dout(10) << " rg " << hex << rgid << dec << " acting role change " << rg->get_role() << " -> " << role << endl; 
          
          // am i old-primary?
          if (rg->get_role() == 0) {
@@ -402,7 +478,7 @@ void OSD::scan_rg()
                for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
                         it != rg->get_peers().end();
                         it++) {
-                 dout(10) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+                 dout(10) << " rg " << hex << rgid << dec << " old-primary, dropping old peer " << it->first << endl;
                  rg->get_old_replica_set().insert(it->first);
                  delete it->second;
                }
@@ -431,7 +507,7 @@ void OSD::scan_rg()
                
                // did primary change?
                if (primary != rg->get_primary()) {
-                 dout(10) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+                 dout(10) << " rg " << hex << rgid << dec << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
                  
                  // re-peer
                  rg->state_clear(RG_STATE_PEERED);
@@ -451,7 +527,7 @@ void OSD::scan_rg()
          // check replicas
          for (int r=1; r<nrep; r++) {
                if (rg->get_peer(r) == 0) {
-                 dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+                 dout(10) << " rg " << hex << rgid << dec << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
                  start_map[acting[r]][rg] = r;
                } 
          }
@@ -475,6 +551,7 @@ void OSD::scan_rg()
 }
 
 
+
 /** peer_notify
  * Send an MOSDRGNotify to a primary, with a list of RGs that I have
  * content for, and they are primary for.
@@ -560,24 +637,23 @@ void OSD::handle_rg_notify(MOSDRGNotify *m)
        assert(nrep > 0);
        assert(acting[0] == whoami);
        
-       // get/open RG
+       // open RG?
        RG *rg = open_rg(rgid);
 
        // previously unknown RG?
-       if (rg->get_peers().empty()) {
-         dout(10) << " rg " << rgid << " is new" << endl;
+       if (!rg) {
+         rg = new_rg(rgid);
+         dout(10) << " rg " << hex << rgid << dec << " is new, nrep=" << nrep << endl;
          for (int r=1; r<nrep; r++) {
-               if (rg->get_peer(r) == 0) {
-                 dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
-                 start_map[acting[r]][rg] = r;
-               } 
+               dout(10) << " rg " << hex << rgid << dec << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+               start_map[acting[r]][rg] = r;
          }
        }
 
        // peered with this guy specifically?
        RGPeer *rgp = rg->get_peer(from);
        if (!rgp && start_map[from].count(rg) == 0) {
-         dout(7) << " rg " << rgid << " primary needs to peer with residual notifier osd" << from << endl;
+         dout(7) << " rg " << hex << rgid << dec << " primary needs to peer with residual notifier osd" << from << endl;
          start_map[from][rg] = -1; 
        }
   }
@@ -624,35 +700,39 @@ void OSD::handle_rg_peer(MOSDRGPeer *m)
           it++) {
        repgroup_t rgid = *it;
        
+       // open RG
+       RG *rg = open_rg(rgid);
+
        // dne?
-       if (!rg_exists(rgid)) {
-         dout(10) << " rg " << rgid << " dne" << endl;
-         ack->rg_dne.push_back(rgid);
-         continue;
+       if (!rg) {
+         // get active rush mapping
+         vector<int> acting;
+         int nrep = osdmap->repgroup_to_acting_osds(rgid, acting);
+         assert(nrep > 0);
+         int role = -1;
+         for (int i=0; i<nrep; i++) 
+               if (acting[i] == whoami) role = i>0 ? 1:0;
+         assert(role != 0);
+
+         if (role < 0) {
+               dout(10) << " rg " << hex << rgid << dec << " dne, and i am not an active replica" << endl;
+               ack->rg_dne.push_back(rgid);
+               continue;
+         }
+         
+         dout(10) << " rg " << hex << rgid << dec << " dne (yet), but i am new active replica " << role << endl;
+         rg = new_rg(rgid);
        }
 
-       // get/open RG
-       RG *rg = open_rg(rgid);
-
        // report back state and rg content
        ack->rg_state[rgid].state = rg->get_state();
        ack->rg_state[rgid].deleted = rg->get_deleted_objects();
 
        // list objects
-       list<object_t> olist;
-       rg->list_objects(store,olist);
-
-       dout(10) << " rg " << rgid << " has state " << rg->get_state() << ", " << olist.size() << " objects" << endl;
-
-       for (list<object_t>::iterator it = olist.begin();
-                it != olist.end();
-                it++) {
-         version_t v = 0;
-         store->getattr(*it, 
-                                         "version",
-                                         &v, sizeof(v));
-         ack->rg_state[rgid].objects[*it] = v;
-       }
+       rg->scan_local_objects(store);
+       ack->rg_state[rgid].objects = rg->local_objects;
+       
+       dout(10) << " rg " << hex << rgid << dec << " has state " << rg->get_state() << ", " << ack->rg_state[rgid].objects.size() << " objects" << endl;
   }
 
   // reply
@@ -691,7 +771,7 @@ void OSD::handle_rg_peer_ack(MOSDRGPeerAck *m)
   for (list<repgroup_t>::iterator it = m->rg_dne.begin();
           it != m->rg_dne.end();
           it++) {
-       dout(10) << " rg " << *it << " dne on osd" << from << endl;
+       dout(10) << " rg " << hex << *it << dec << " dne on osd" << from << endl;
        
        RG *rg = open_rg(*it);
        assert(rg);
@@ -707,7 +787,7 @@ void OSD::handle_rg_peer_ack(MOSDRGPeerAck *m)
   for (map<repgroup_t, RGReplicaInfo>::iterator it = m->rg_state.begin();
           it != m->rg_state.end();
           it++) {
-       dout(10) << " rg " << it->first << " got state " << it->second.state 
+       dout(10) << " rg " << hex << it->first << dec << " got state " << it->second.state 
                         << " " << it->second.objects.size() << " objects, " 
                         << it->second.deleted.size() << " deleted" << endl;
 
@@ -717,6 +797,23 @@ void OSD::handle_rg_peer_ack(MOSDRGPeerAck *m)
        assert(rgp);
 
        rgp->peer_state = it->second;
+       rgp->state_set(RG_PEER_STATE_ACTIVE);
+
+       // fully peered?
+       bool fully = true;
+       for (map<int, RGPeer*>::iterator pit = rg->get_peers().begin();
+                pit != rg->get_peers().end();
+                pit++) {
+         if (!pit->second->is_active()) fully = false;
+       }
+
+       if (fully) {
+         dout(10) << " rg " << hex << it->first << dec << " fully peered, analyzing" << endl;
+         rg->mark_peered();
+         rg->analyze_peers(store);
+
+         do_recovery(rg);
+       }         
   }
 
   // done
@@ -725,6 +822,348 @@ void OSD::handle_rg_peer_ack(MOSDRGPeerAck *m)
 
 
 
+// RECOVERY
+
+void OSD::do_recovery(RG *rg)
+{
+  // pull?
+  if (!rg->is_complete()) {
+       rg_pull(rg, max_recovery_ops);
+  } else {
+       if (!rg->is_clean()) {
+         rg_push(rg, max_recovery_ops);
+         rg_clean(rg, max_recovery_ops);
+       }
+  }
+}
+
+
+// pull
+
+void OSD::rg_pull(RG *rg, int maxops)
+{
+  int ops = rg->num_active_ops();
+
+  dout(7) << "rg_pull rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" <<  endl;
+  
+  while (ops < maxops) {
+       object_t oid;
+       version_t v;
+       int peer;
+       if (!rg->pull_plan.get_next(oid, v, peer)) break;
+       RGPeer *rgp = rg->get_proxy_peer(oid);
+       if (rgp == 0) {
+         dout(7) << " apparently already pulled " << hex << oid << dec << endl;
+         continue;
+       }
+       if (rgp->is_pulling(oid)) {
+         dout(7) << " already pulling " << hex << oid << dec << endl;
+         continue;
+       }
+       pull_replica(oid, v, rgp);
+       ops++;
+  }  
+}
+
+void OSD::pull_replica(object_t oid, version_t v, RGPeer *p)
+{
+  dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
+
+  // add to fetching list
+  p->pull(oid, v);
+
+  // send op
+  __uint64_t tid = ++last_tid;
+  MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+                                                 oid, p->rg->get_rgid(),
+                                                 osdmap->get_version(),
+                                                 OSD_OP_REP_PULL);
+  op->set_version(v);
+  op->set_rg_role(-1);  // whatever, not 0
+  messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+
+  // register
+  pull_ops[tid] = p;
+}
+
+void OSD::op_rep_pull(MOSDOp *op)
+{
+  dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+  lock_object(op->get_oid());
+  
+  // get object size
+  struct stat st;
+  int r = store->stat(op->get_oid(), &st);
+  assert(r == 0);
+
+  // check version
+  version_t v = 0;
+  store->getattr(op->get_oid(), "version", &v, sizeof(v));
+  assert(v == op->get_version());
+  
+  // read
+  bufferptr bptr = new buffer(st.st_size);   // prealloc space for entire read
+  long got = store->read(op->get_oid(), 
+                                                st.st_size, 0,
+                                                bptr.c_str());
+  assert(got == st.st_size);
+  
+  // reply
+  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); 
+  bptr.set_length(got);   // properly size the buffer
+  bufferlist bl;
+  bl.push_back( bptr );
+  reply->set_result(0);
+  reply->set_data(bl);
+  reply->set_length(got);
+  reply->set_offset(0);
+  
+  messenger->send_message(reply, op->get_asker());
+
+  unlock_object(op->get_oid());
+  delete op;
+}
+
+void OSD::op_rep_pull_reply(MOSDOpReply *op)
+{
+  dout(7) << "op_rep_pull_reply " << hex << op->get_oid() << dec << " size " << op->get_length() << endl;
+
+  osd_lock.Lock();
+  RGPeer *p = pull_ops[op->get_tid()];
+  RG *rg = p->rg;
+  assert(p);   // FIXME: how will this work?
+  assert(p->is_pulling(op->get_oid()));
+  assert(p->pulling_version(op->get_oid()) == op->get_version());
+  osd_lock.Unlock();
+
+  // write it and add it to the RG
+  store->write(op->get_oid(), op->get_length(), 0, op->get_data().c_str());
+  p->rg->add_object(store, op->get_oid());
+
+  // close out pull op.
+  osd_lock.Lock();
+  pull_ops.erase(op->get_tid());
+  rg->pulled(op->get_oid(), op->get_version(), p);
+
+  // finish waiters
+  if (waiting_for_object.count(op->get_oid())) 
+       take_waiters(waiting_for_object[op->get_oid()]);
+
+  // more?
+  do_recovery(rg);
+
+  osd_lock.Unlock();
+  
+  delete op;
+}
+
+
+// push
+
+void OSD::rg_push(RG *rg, int maxops)
+{
+  int ops = rg->num_active_ops();
+
+  dout(7) << "rg_push rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" <<  endl;
+  
+  while (ops < maxops) {
+       object_t oid;
+       version_t v;
+       int peer;
+       if (!rg->push_plan.get_next(oid, v, peer)) break;
+
+       RGPeer *p = rg->get_peer(peer);
+       assert(p);
+       push_replica(oid, v, p);
+       ops++;
+  }  
+  
+}
+
+void OSD::push_replica(object_t oid, version_t v, RGPeer *p)
+{
+  dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osd" << p->get_peer() << endl;
+
+  // add to list
+  p->push(oid, v);
+
+  // send op
+  __uint64_t tid = ++last_tid;
+  MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+                                                 oid, p->rg->get_rgid(),
+                                                 osdmap->get_version(),
+                                                 OSD_OP_REP_PUSH);
+  op->set_version(v);
+  op->set_rg_role(-1);  // whatever, not 0
+
+  // include object content
+  struct stat st;
+  store->stat(oid, &st);
+  bufferptr b = new buffer(st.st_size);
+  store->read(oid, st.st_size, 0, b.c_str());
+  op->get_data().append(b);
+  op->set_length(st.st_size);
+  op->set_offset(0);
+
+  messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+
+  // register
+  push_ops[tid] = p;
+}
+
+void OSD::op_rep_push(MOSDOp *op)
+{
+  dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
+  lock_object(op->get_oid());
+
+  // exists?
+  if (store->exists(op->get_oid())) {
+       store->truncate(op->get_oid(), 0);
+
+       version_t ov = 0;
+       store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
+       assert(ov <= op->get_version());
+  }
+
+  // write out buffers
+  bufferlist bl;
+  bl.claim( op->get_data() );
+
+  off_t off = 0;
+  for (list<bufferptr>::iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+       int r = store->write(op->get_oid(),
+                                                (*it).length(), off,
+                                                (*it).c_str(), 
+                                                false);  // write async, no rush
+       assert((unsigned)r == (*it).length());
+       off += (*it).length();
+  }
+
+  // set version
+  version_t v = op->get_version();
+  store->setattr(op->get_oid(), "version", &v, sizeof(v));
+
+  // reply
+  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap);
+  messenger->send_message(reply, op->get_asker());
+  
+  unlock_object(op->get_oid());
+  delete op;
+}
+
+void OSD::op_rep_push_reply(MOSDOpReply *op)
+{
+  dout(7) << "op_rep_push_reply " << hex << op->get_oid() << dec << endl;
+
+  osd_lock.Lock();
+  RGPeer *p = push_ops[op->get_tid()];
+  RG *rg = p->rg;
+  assert(p);   // FIXME: how will this work?
+  assert(p->is_pushing(op->get_oid()));
+  assert(p->pushing_version(op->get_oid()) == op->get_version());
+
+  // close out push op.
+  push_ops.erase(op->get_tid());
+  rg->pushed(op->get_oid(), op->get_version(), p);
+
+  // more?
+  do_recovery(rg);
+
+  osd_lock.Unlock();
+  
+  delete op;
+}
+
+
+// clean
+
+void OSD::rg_clean(RG *rg, int maxops)
+{
+  int ops = rg->num_active_ops();
+
+  dout(7) << "rg_clean rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" <<  endl;
+  
+  while (ops < maxops) {
+       object_t oid;
+       version_t v;
+       int peer;
+       if (!rg->clean_plan.get_next(oid, v, peer)) break;
+
+       RGPeer *p = rg->get_peer(peer);
+       assert(p);
+       remove_replica(oid, v, p);
+       ops++;
+  }  
+}
+
+void OSD::remove_replica(object_t oid, version_t v, RGPeer *p)
+{
+  dout(7) << "remove_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
+
+  p->remove(oid, v);
+  
+  // send op
+  __uint64_t tid = ++last_tid;
+  MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+                                                 oid, p->rg->get_rgid(),
+                                                 osdmap->get_version(),
+                                                 OSD_OP_REP_REMOVE);
+  op->set_version(v);
+  op->set_rg_role(-1);  // whatever, not 0
+  messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+  
+  // register
+  remove_ops[tid] = p;
+}
+
+void OSD::op_rep_remove(MOSDOp *op)
+{
+  dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
+  lock_object(op->get_oid());
+  
+  // sanity checks
+  assert(store->exists(op->get_oid()));
+
+  version_t v = 0;
+  store->getattr(op->get_oid(), "version", &v, sizeof(v));
+  assert(v == op->get_version());
+
+  // remove
+  int r = store->remove(op->get_oid());
+  assert(r == 0);
+
+  // reply
+  messenger->send_message(new MOSDOpReply(op, r, osdmap), 
+                                                 op->get_asker());
+
+  unlock_object(op->get_oid());
+  delete op;
+}
+
+void OSD::op_rep_remove_reply(MOSDOpReply *op)
+{
+  dout(7) << "op_rep_remove_reply " << hex << op->get_oid() << dec << endl;
+
+  osd_lock.Lock();
+  RGPeer *p = remove_ops[op->get_tid()];
+  RG *rg = p->rg;
+  assert(p);   // FIXME: how will this work?
+  assert(p->is_removing(op->get_oid()));
+  assert(p->removing_version(op->get_oid()) == op->get_version());
+
+  // close out push op.
+  remove_ops.erase(op->get_tid());
+  rg->removed(op->get_oid(), op->get_version(), p);
+
+  // more?
+  do_recovery(rg);
+
+  osd_lock.Unlock();
+  
+  delete op;
+}
 
 
 
@@ -765,13 +1204,19 @@ void OSD::handle_op(MOSDOp *op)
   if (op->get_map_version() < osdmap->get_version()) {
        // op's is old
        dout(7) << "op map " << op->get_map_version() << " < " << osdmap->get_version() << endl;
+       
+       if (op->get_rg_role() != 0) {
+         dout(7) << " dropping rep op with old map" << endl;
+         delete op;
+         return;
+       }
   }
 
 
   // did this op go to the right OSD?
   if (op->get_rg_role() == 0) {
-    repgroup_t rg = op->get_rg();
-       int acting_primary = osdmap->get_rg_acting_primary( rg );
+    repgroup_t rgid = op->get_rg();
+       int acting_primary = osdmap->get_rg_acting_primary( rgid );
        
        if (acting_primary != whoami) {
          dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
@@ -779,6 +1224,36 @@ void OSD::handle_op(MOSDOp *op)
          logger->inc("fwd");
          return;
        }
+
+       // proxy?
+       RG *rg = open_rg(rgid);
+       if (!rg) {
+         // fail now?
+         dout(7) << "hit non-existent rg " << hex << op->get_rg() << dec << ", creating willy nilly for now" << endl;
+         rg = new_rg(rgid);  // for now.. FIXME
+       }
+       else {
+         if (!rg->is_complete()) {
+               // consult RG object map
+               RGPeer *rgp = rg->get_proxy_peer(op->get_oid());
+               version_t v = rg->get_proxy_version(op->get_oid());
+
+               if (op->get_op() == OSD_OP_WRITE && v == 0) {
+                 // totally new object.
+               }
+               else if (rgp) {
+                 // need to pull
+                 dout(7) << "need to pull object " << hex << op->get_oid() << dec << endl;
+                 RGPeer *rgp = rg->get_proxy_peer(op->get_oid());
+                 if (!rgp->is_pulling(op->get_oid())) {
+                       pull_replica(op->get_oid(), v, rgp);
+                 }
+                 waiting_for_object[op->get_oid()].push_back(op);
+                 return;
+               }
+         }
+         
+       }       
   }
 
   // queue op
@@ -786,6 +1261,11 @@ void OSD::handle_op(MOSDOp *op)
 }
 
 void OSD::queue_op(MOSDOp *op) {
+  // inc pending count
+  osd_lock.Lock();
+  pending_ops++;
+  osd_lock.Unlock();
+
   threadpool->put_op(op);
 }
   
@@ -796,35 +1276,63 @@ void OSD::do_op(MOSDOp *op)
   // do the op
   switch (op->get_op()) {
 
+  case OSD_OP_MKFS:
+    op_mkfs(op);
+    break;
+
   case OSD_OP_READ:
     op_read(op);
     break;
-
   case OSD_OP_WRITE:
     op_write(op);
     break;
-
-  case OSD_OP_MKFS:
-    op_mkfs(op);
-    break;
-
   case OSD_OP_DELETE:
     op_delete(op);
     break;
-
   case OSD_OP_TRUNCATE:
     op_truncate(op);
     break;
-
   case OSD_OP_STAT:
     op_stat(op);
     break;
+
+       // replication/recovery
+  case OSD_OP_REP_PULL:
+       op_rep_pull(op);
+       break;
+  case OSD_OP_REP_PUSH:
+       op_rep_push(op);
+       break;
+  case OSD_OP_REP_REMOVE:
+       op_rep_remove(op);
+       break;
        
   default:
     assert(0);
   }
+
+  // finish
+  osd_lock.Lock();
+  assert(pending_ops > 0);
+  pending_ops--;
+  if (pending_ops == 0 && waiting_for_no_ops)
+       no_pending_ops.Signal();
+  osd_lock.Unlock();
 }
 
+void OSD::wait_for_no_ops()
+{
+  osd_lock.Lock();
+  if (pending_ops > 0) {
+       dout(7) << "wait_for_no_ops - waiting for " << pending_ops << endl;
+       waiting_for_no_ops = true;
+       no_pending_ops.Wait(osd_lock);
+       waiting_for_no_ops = false;
+       assert(pending_ops == 0);
+  } 
+  dout(7) << "wait_for_no_ops - none" << endl;
+  osd_lock.Unlock();
+}
 
 void OSD::op_read(MOSDOp *r)
 {
@@ -851,7 +1359,7 @@ void OSD::op_read(MOSDOp *r)
        reply->set_length(0);
   }
   
-  dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
+  dout(12) << "read got " << got << " / " << r->get_length() << " bytes from obj " << hex << r->get_oid() << dec << endl;
 
   logger->inc("rd");
   if (got >= 0) logger->inc("rdb", got);
@@ -924,19 +1432,28 @@ void OSD::op_write(MOSDOp *op)
                                                 write_sync);  // write synchronously
        off += (*it).length();
        if (r < 0) {
-         dout(1) << "write error on " << op->get_oid() << " len " << (*it).length() << "  off " << off << "  r = " << r << endl;
+         dout(1) << "write error on " << hex << op->get_oid() << dec << " len " << (*it).length() << "  off " << off << "  r = " << r << endl;
          assert(r >= 0);
        }
   }
 
   // update object metadata
-  if (!existed) {
+  osd_lock.Lock();
+  version_t v = 1;
+  if (op->get_rg_role() == -1) {
+       v = op->get_version();
+       store->setattr(op->get_oid(), "version", &v, sizeof(v));
+  } else if (existed) {
+       // get + inc version
+       store->getattr(op->get_oid(), "version", &v, sizeof(v));
+       v++;
+  } else {
        // add to RG collection
-       osd_lock.Lock();
        RG *r = open_rg(op->get_rg());
        r->add_object(store, op->get_oid());
-       osd_lock.Unlock();
   }
+  store->setattr(op->get_oid(), "version", &v, sizeof(v));
+  osd_lock.Unlock();
 
   logger->inc("wr");
   logger->inc("wrb", op->get_length());
@@ -981,7 +1498,7 @@ void OSD::op_mkfs(MOSDOp *op)
 void OSD::op_delete(MOSDOp *op)
 {
   int r = store->remove(op->get_oid());
-  dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+  dout(12) << "delete on " << hex << op->get_oid() << dec << " r = " << r << endl;
   
   // "ack"
   messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
@@ -993,7 +1510,7 @@ void OSD::op_delete(MOSDOp *op)
 void OSD::op_truncate(MOSDOp *op)
 {
   int r = store->truncate(op->get_oid(), op->get_offset());
-  dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+  dout(3) << "truncate on " << hex << op->get_oid() << dec << " at " << op->get_offset() << " r = " << r << endl;
   
   // "ack"
   messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
@@ -1009,7 +1526,7 @@ void OSD::op_stat(MOSDOp *op)
   memset(&st, sizeof(st), 0);
   int r = store->stat(op->get_oid(), &st);
   
-  dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+  dout(3) << "stat on " << hex << op->get_oid() << dec << " r = " << r << " size = " << st.st_size << endl;
          
   MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap);
   reply->set_object_size(st.st_size);
index 80c2d54ef05f8065796ee1579d8ad5b8f2d5e98e..32135a225c7172bbff406f7b8f06b9ef8cdee881 100644 (file)
 
 #include "ObjectStore.h"
 
+#include "RG.h"
+
 #include <map>
 using namespace std;
 #include <ext/hash_map>
+#include <ext/hash_set>
 using namespace __gnu_cxx;
 
 
 class Messenger;
 class Message;
 
-typedef __uint64_t version_t;
-
-
-struct RGReplicaInfo {
-  int state;
-  map<object_t,version_t>  objects;        // remote object list
-  map<object_t,version_t>  deleted;        // remote delete list
-
-  void _encode(bufferlist& blist) {
-       blist.append((char*)&state, sizeof(state));
-       ::_encode(objects, blist);
-       ::_encode(deleted, blist);
-  }
-  void _decode(bufferlist& blist, int& off) {
-       blist.copy(off, sizeof(state), (char*)&state);
-       off += sizeof(state);
-       ::_decode(objects, blist, off);
-       ::_decode(deleted, blist, off);
-  }
-
-  RGReplicaInfo() : state(0) { }
-};
-
-
-/** RGPeer
- * state associated with non-primary OSDS with RG content.
- * only used by primary.
- */
-
-// by primary
-#define RG_PEER_STATE_ACTIVE    1   // peer has acked our request, sent back RG state.
-#define RG_PEER_STATE_COMPLETE  2   // peer has everything replicated
-
-class RGPeer {
- private:
-  int       peer;
-  int       role;    // 0 primary, 1+ replica, -1 residual
-  int       state;
-
-  // peer state
- public:
-  RGReplicaInfo peer_state;
-
- protected:
-  // active|residual: used by primary for syncing (old) replicas
-  map<object_t,version_t>  fetching;       // objects i'm reading from replica
-  map<object_t,version_t>  stray;          // objects that need to be deleted
-  
-  // active: used by primary for normal replication stuff
-  map<object_t,version_t>  writing;        // objects i've written to replica
-  map<object_t,version_t>  flushing;       // objects i've written to remote buffer cache only
-
- public:
-  RGPeer(int p, int r) : peer(p), role(r), state(0) { }
-
-  int get_role() { return role; }
-  int get_peer() { return peer; }
-  bool state_test(int m) { return state & m != 0; }
-  void state_set(int m) { state |= m; }
-  void state_clear(int m) { state &= ~m; }
-
-  bool is_active() { return state_test(RG_PEER_STATE_ACTIVE); }
-  bool is_complete() { return state_test(RG_PEER_STATE_COMPLETE); }
-
-  bool is_residual() { return role < 0; }
-  bool is_empty() { return is_active() && peer_state.objects.empty(); }  // *** && peer_state & COMPLETE
-};
-
-
-
-
-
-
-/** RG - Replica Group
- *
- */
 
-// bits used on any
-#define RG_STATE_COMPLETE    1  // i have full RG contents locally.
-#define RG_STATE_PEERED      2  // i have contacted prior primary and all
-                                // replica osds and/or fetched their 
-                                // content lists, and thus know what's up.
-                                // or, i have check in w/ new primary (on replica)
-
-// on primary or old-primary only
-#define RG_STATE_CLEAN       4  // i am fully replicated
-
-class RG {
- protected:
-  repgroup_t rgid;
-  int        role;    // 0 = primary, 1 = secondary, etc.  -1=undef/none.
-  int        state;   // see bit defns above
-
-  int        primary;         // replica: who the primary is (if not me)
-  set<int>   old_replica_set; // old primary: where replicas used to be
-  
-  map<int, RGPeer*>         peers;  // primary: (soft state) active peers
-
-  // for unstable states,
-  map<object_t, version_t>  deleted_objects;  // locally deleted objects
-
- public:  
-  RG(repgroup_t r) : rgid(r),
-       role(0),
-       state(0),
-       primary(-1) { }
-  
-  repgroup_t get_rgid() { return rgid; }
-  int        get_role() { return role; }
-  int        get_primary() { return primary; }
-
-  void       set_role(int r) { role = r; }
-  void       set_primary(int p) { primary = p; }
-
-  map<int, RGPeer*>& get_peers() { return peers; }
-  RGPeer* get_peer(int p) {
-       if (peers.count(p)) return peers[p];
-       return 0;
-  }
-  RGPeer* new_peer(int p, int r) {
-       return peers[p] = new RGPeer(p, r);
-  }
-  void remove_peer(int p) {
-       assert(peers.count(p));
-       delete peers[p];
-       peers.erase(p);
-  }
-
-  set<int>&                 get_old_replica_set() { return old_replica_set; }
-  map<object_t, version_t>& get_deleted_objects() { return deleted_objects; }
-
-
-  int  get_state() { return state; }
-  bool state_test(int m) { return (state & m) != 0; }
-  void set_state(int s) { state = s; }
-  void state_set(int m) { state |= m; }
-  void state_clear(int m) { state &= ~m; }
-  
-  void store(ObjectStore *store) {
-       if (!store->collection_exists(rgid))
-         store->collection_create(rgid);
-       store->collection_setattr(rgid, "role", &role, sizeof(role));
-       store->collection_setattr(rgid, "primary", &primary, sizeof(primary));
-       store->collection_setattr(rgid, "state", &state, sizeof(state));        
-  }
-  void fetch(ObjectStore *store) {
-       store->collection_getattr(rgid, "role", &role, sizeof(role));
-       store->collection_getattr(rgid, "primary", &primary, sizeof(primary));
-       store->collection_getattr(rgid, "state", &state, sizeof(state));        
-  }
-
-  void add_object(ObjectStore *store, object_t oid) {
-       store->collection_add(rgid, oid);
-  }
-  void remove_object(ObjectStore *store, object_t oid) {
-       store->collection_remove(rgid, oid);
-  }
-  void list_objects(ObjectStore *store, list<object_t>& ls) {
-       store->collection_list(rgid, ls);
-  }
-};
-
-
-/** Onode
- * per-object OSD metadata
- */
-class Onode {
-  object_t            oid;
-  version_t           version;
-
-  map<int, version_t> stray_replicas;   // osds w/ stray replicas.
-
- public:
-  Onode(object_t o) : oid(o), version(0) { }
-
-  void store(ObjectStore *store) {
-       
-  }
-  void fetch(ObjectStore *store) {
-
-  }
-
-};
 
 
 class OSD : public Dispatcher {
@@ -212,14 +33,37 @@ class OSD : public Dispatcher {
   class HostMonitor *monitor;
   class Logger      *logger;
 
+  int max_recovery_ops;
+
   // global lock
-  Mutex osd_lock;
+  Mutex osd_lock;                          
+
+  // per-object locking (serializing)
+  hash_set<object_t>               object_lock;
+  hash_map<object_t, list<Cond*> > object_lock_waiters;  
+  void lock_object(object_t oid);
+  void unlock_object(object_t oid);
+
+  // finished waiting messages, that will go at tail of dispatch()
+  list<class Message*> finished;
+  void take_waiters(list<class Message*>& ls) {
+       finished.splice(finished.end(), ls);
+  }
+  
+  // -- objects --
+  int read_onode(onode_t& onode);
+  int write_onode(onode_t& onode);
 
 
   // -- ops --
   class ThreadPool<class OSD, class MOSDOp>  *threadpool;
+  int   pending_ops;
+  bool  waiting_for_no_ops;
+  Cond  no_pending_ops;
 
   void queue_op(class MOSDOp *m);
+  void wait_for_no_ops();
+  
  public:
   void do_op(class MOSDOp *m);
   static void doop(OSD *o, MOSDOp *op) {
@@ -238,7 +82,6 @@ class OSD : public Dispatcher {
 
   
   // <old replica hack>
-  __uint64_t                     last_tid;
   Mutex                          replica_write_lock;
   map<MOSDOp*, Cond*>            replica_write_cond;
   map<MOSDOp*, set<__uint64_t> > replica_write_tids;
@@ -248,10 +91,19 @@ class OSD : public Dispatcher {
 
   // -- replication --
   hash_map<repgroup_t, RG*>      rg_map;
+  set<RG*>                       rg_unstable;
+  __uint64_t                     last_tid;
+  map<__uint64_t,RGPeer*>        pull_ops;   // tid -> RGPeer*
+  map<__uint64_t,RGPeer*>        push_ops;   // tid -> RGPeer*
+  map<__uint64_t,RGPeer*>        remove_ops;   // tid -> RGPeer*
+
+  hash_map<object_t, list<Message*> >    waiting_for_object;
+  hash_map<repgroup_t, list<Message*> >  waiting_for_rg;
 
   void get_rg_list(list<repgroup_t>& ls);
   bool rg_exists(repgroup_t rg);
-  RG *open_rg(repgroup_t rg);            // return RG, load state from store (if needed)
+  RG *new_rg(repgroup_t rg);             // create new RG
+  RG *open_rg(repgroup_t rg);            // return existing RG, load state from store (if needed)
   void close_rg(repgroup_t rg);          // close in-memory state
   void remove_rg(repgroup_t rg);         // remove state from store
 
@@ -259,10 +111,27 @@ class OSD : public Dispatcher {
   void peer_notify(int primary, list<repgroup_t>& rg_list);
   void peer_start(int replica, map<RG*,int>& rg_map);
 
+  void do_recovery(RG *rg);
+  void rg_pull(RG *rg, int maxops);
+  void rg_push(RG *rg, int maxops);
+  void rg_clean(RG *rg, int maxops);
+
+  void pull_replica(object_t oid, version_t v, RGPeer *p);
+  void push_replica(object_t oid, version_t v, RGPeer *p);
+  void remove_replica(object_t oid, version_t v, RGPeer *p);
+
   void handle_rg_notify(class MOSDRGNotify *m);
   void handle_rg_peer(class MOSDRGPeer *m);
   void handle_rg_peer_ack(class MOSDRGPeerAck *m);
 
+  void op_rep_pull(class MOSDOp *op);
+  void op_rep_pull_reply(class MOSDOpReply *op);
+  void op_rep_push(class MOSDOp *op);
+  void op_rep_push_reply(class MOSDOpReply *op);
+  void op_rep_remove(class MOSDOp *op);
+  void op_rep_remove_reply(class MOSDOpReply *op);
+
+
  public:
   OSD(int id, Messenger *m);
   ~OSD();
index affe45279023de2a7c3b1f54c9e09fb0c81a0188..8a8d76c56e05b1147cda788c44f74ed19624b54d 100644 (file)
@@ -27,6 +27,8 @@ using namespace std;
 
 #define OID_ONO_BITS       30       // 1mb * 10^9 = 1 petabyte files
 #define OID_INO_BITS       (64-30)  // 2^34 =~ 16 billion files
+#define RG_NUM_BITS        32
+#define RG_REP_BITS        10
 
 //#define MAX_FILE_SIZE      (FILE_OBJECT_SIZE << OID_ONO_BITS)  // 1 PB
 
@@ -162,13 +164,13 @@ class OSDMap {
        // hash (ino+ono).  nrep needs to be reversible (see repgroup_to_nrep).
        static hash<int> H;
        
-       return (H(inode.ino+ono) % g_conf.osd_num_rg) +
-         ((inode.layout.num_rep-1) * g_conf.osd_num_rg);
+       return ((repgroup_t)(H(inode.ino+ono) % g_conf.osd_num_rg) & ((1LL<<RG_NUM_BITS)-1LL)) +
+         ((repgroup_t)inode.layout.num_rep << RG_NUM_BITS);
   }
 
   /* get nrep from rgid */
   int repgroup_to_nrep(repgroup_t rg) {
-       return rg / g_conf.osd_num_rg;
+       return rg >> RG_NUM_BITS;
   }
 
   /* map (repgroup) to a raw list of osds.  
diff --git a/ceph/osd/RG.cc b/ceph/osd/RG.cc
new file mode 100644 (file)
index 0000000..068853a
--- /dev/null
@@ -0,0 +1,204 @@
+
+#include "RG.h"
+#include "config.h"
+
+#undef dout
+#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << ".rg" << hex << rgid << dec << " "
+
+
+void RG::mark_peered()
+{
+  dout(10) << "mark_peered" << endl;
+  state_set(RG_STATE_PEERED);
+}
+
+void RG::pulled(object_t oid, version_t v, RGPeer *p)
+{
+  dout(10) << "pulled o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+  local_objects[oid] = v;
+
+  // update peer state
+  p->pulled(oid);
+  
+  objects_loc.erase(oid);  // object is now local
+  if (objects_loc.empty()) {
+       assert(!is_complete());
+       mark_complete();
+       
+       if (!is_clean()) {
+         plan_push();
+         plan_cleanup();
+       }
+  }
+}
+
+void RG::mark_complete()
+{
+  dout(10) << "mark_complete" << endl;
+  assert(!is_complete());
+
+  // done pulling objects!
+  state_set(RG_STATE_COMPLETE);
+  pull_plan.clear();
+  
+  // hose any !complete state
+  objects.clear();
+  objects_loc.clear();
+  deleted_objects.clear();
+}
+
+void RG::pushed(object_t oid, version_t v, RGPeer *p)
+{
+  dout(10) << "pushed o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+  // update peer state
+  p->pushed(oid);
+  
+  // clean now?
+}
+
+void RG::removed(object_t oid, version_t v, RGPeer *p)
+{
+  dout(10) << "removed o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+  // update peer state
+  p->removed(oid);
+  
+  // clean now?
+}
+
+
+
+
+void RG::analyze_peers(ObjectStore *store) 
+{
+  dout(10) << "analyze_peers" << endl;
+
+  // compare
+  map<object_t, int>       nreps;    // not quite accurate.  for pull.
+  
+  objects = local_objects;  // start w/ local object set.
+  
+  // newest objects -> objects
+  for (map<int, RGPeer*>::iterator pit = peers.begin();
+          pit != peers.end();
+          pit++) {
+       for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
+                oit != pit->second->peer_state.objects.end();
+                oit++) {
+         // know this object?
+         if (objects.count(oit->first)) {
+               object_t v = objects[oit->first];
+               if (oit->second < v)       // older?
+                 continue;                // useless
+               else if (oit->second == v) // same?
+                 nreps[oit->first]++;     // not quite accurate bc local_objects isn't included in nrep
+               else {                     // newer!
+                 objects[oit->first] = oit->second;
+                 nreps[oit->first] = 0;
+                 objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
+               }
+         } else {
+               // newly seen object!
+               objects[oit->first] = oit->second;
+               nreps[oit->first] = 0;
+               objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
+         }
+       }
+  }
+  
+  // remove deleted objects
+  assim_deleted_objects(deleted_objects);             // locally
+  for (map<int, RGPeer*>::iterator pit = peers.begin();
+          pit != peers.end();
+          pit++) 
+       assim_deleted_objects(pit->second->peer_state.deleted);  // on peers
+  
+  // plan pull
+  // order objects by replication level
+  map<int, list<object_t> > byrep;
+  for (map<object_t, int>::iterator oit = objects_loc.begin();
+          oit != objects_loc.end();
+          oit++) 
+       byrep[nreps[oit->first]].push_back(oit->first);
+  // make plan
+  pull_plan.clear();
+  for (map<int, list<object_t> >::iterator it = byrep.begin();
+          it != byrep.end();
+          it++) {
+       for (list<object_t>::iterator oit = it->second.begin();
+                oit != it->second.end();
+                oit++) {
+         dout(10) << " rg " << hex << rgid << dec << " o " << *oit << " will proxy+pull" << endl;                
+         pull_plan.push_front(*oit, objects[*oit], objects_loc[*oit]);
+       }
+  }
+  
+  // just cleanup old local objects
+  // FIXME: do this async?
+  for (map<object_t, version_t>::iterator it = local_objects.begin();
+          it != local_objects.end();
+          it++) {
+       if (objects.count(it->first) && objects[it->first] == it->second) continue;  // same!
+       
+       dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl;
+       store->remove(it->first);
+       local_objects.erase(it->first);
+  }
+}
+
+
+void RG::plan_push()
+{
+  dout(10) << "plan_push" << endl;
+  assert(is_complete());
+  assert(is_peered());
+
+  // push
+  push_plan.clear();
+  for (map<object_t, version_t>::iterator oit = local_objects.begin();
+          oit != local_objects.end();
+          oit++) {
+       for (map<int, RGPeer*>::iterator pit = peers.begin();
+                pit != peers.end();
+                pit++) {
+         RGPeer *rgp = pit->second;
+         if (rgp->get_role() < 0) continue;
+
+         if (rgp->peer_state.objects.count(oit->first) == 0 || 
+                 oit->second < rgp->peer_state.objects[oit->first]) {
+               dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " old|dne, pushing" << endl;
+               push_plan.push_back(oit->first, oit->second, pit->first);
+         }
+       }
+  }
+}
+
+void RG::plan_cleanup()
+{
+  dout(10) << "plan_cleanup" << endl;
+  assert(is_complete());
+  assert(is_peered());
+
+  // cleanup
+  clean_plan.clear();
+  for (map<int, RGPeer*>::iterator pit = peers.begin();
+          pit != peers.end();
+          pit++) {
+       RGPeer *rgp = pit->second;
+       for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
+                oit != pit->second->peer_state.objects.end();
+                oit++) {
+         if (rgp->get_role() < 0) {
+               dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " stray, removing" << endl;
+         } 
+         else if (local_objects.count(oit->first) == 0) {
+               dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " deleted, removing" << endl;
+         } 
+         else continue;
+         clean_plan.push_back(oit->first, oit->second, pit->first);
+       }
+  }
+
+}
diff --git a/ceph/osd/RG.h b/ceph/osd/RG.h
new file mode 100644 (file)
index 0000000..5d6cab4
--- /dev/null
@@ -0,0 +1,295 @@
+
+#include "include/types.h"
+#include "include/bufferlist.h"
+#include "ObjectStore.h"
+
+struct RGReplicaInfo {
+  int state;
+  map<object_t,version_t>  objects;        // remote object list
+  map<object_t,version_t>  deleted;        // remote delete list
+
+  void _encode(bufferlist& blist) {
+       blist.append((char*)&state, sizeof(state));
+       ::_encode(objects, blist);
+       ::_encode(deleted, blist);
+  }
+  void _decode(bufferlist& blist, int& off) {
+       blist.copy(off, sizeof(state), (char*)&state);
+       off += sizeof(state);
+       ::_decode(objects, blist, off);
+       ::_decode(deleted, blist, off);
+  }
+
+  RGReplicaInfo() : state(0) { }
+};
+
+
+/** RGPeer
+ * state associated with non-primary OSDS with RG content.
+ * only used by primary.
+ */
+
+// by primary
+#define RG_PEER_STATE_ACTIVE    1   // peer has acked our request, sent back RG state.
+#define RG_PEER_STATE_COMPLETE  2   // peer has everything replicated
+
+class RGPeer {
+ public:
+  class RG *rg;
+ private:
+  int       peer;
+  int       role;    // 0 primary, 1+ replica, -1 residual
+  int       state;
+
+  // peer state
+ public:
+  RGReplicaInfo peer_state;
+
+ protected:
+  // recovery: for pulling content from (old) replicas
+  map<object_t,version_t>  pulling;
+  map<object_t,version_t>  pushing;
+  map<object_t,version_t>  removing;
+  
+  // replication: for pushing replicas (new or old)
+  map<object_t,version_t>  writing;        // objects i've written to replica
+  map<object_t,version_t>  flushing;       // objects i've written to remote buffer cache only
+
+ public:
+  RGPeer(class RG *rg, int p, int ro) : rg(rg), peer(p), role(ro), state(0) { }
+
+  int get_role() { return role; }
+  int get_peer() { return peer; }
+  bool state_test(int m) { return state & m != 0; }
+  void state_set(int m) { state |= m; }
+  void state_clear(int m) { state &= ~m; }
+
+  bool is_active() { return state_test(RG_PEER_STATE_ACTIVE); }
+  bool is_complete() { return state_test(RG_PEER_STATE_COMPLETE); }
+
+  bool is_residual() { return role < 0; }
+  bool is_empty() { return is_active() && peer_state.objects.empty(); }  // *** && peer_state & COMPLETE
+
+  void pull(object_t o, version_t v) { pulling[o] = v; }
+  bool is_pulling(object_t o) { return pulling.count(o); }
+  version_t pulling_version(object_t o) { return pulling[o]; }
+  void pulled(object_t o) { pulling.erase(o); }
+
+  void push(object_t o, version_t v) { pushing[o] = v; }
+  bool is_pushing(object_t o) { return pushing.count(o); }
+  version_t pushing_version(object_t o) { return pushing[o]; }
+  void pushed(object_t o) { pushing.erase(o); }
+
+  void remove(object_t o, version_t v) { removing[o] = v; }
+  bool is_removing(object_t o) { return removing.count(o); }
+  version_t removing_version(object_t o) { return removing[o]; }
+  void removed(object_t o) { 
+       removing.erase(o); 
+       peer_state.objects.erase(o);
+  }
+
+  int num_active_ops() {
+       return pulling.size() + pushing.size() + removing.size();
+  }
+};
+
+
+
+
+// a task list for moving objects around
+class RGQueue {
+  list<object_t>  objects;
+  list<version_t> versions;
+  list<int>       peers;
+ public:
+  void push_back(object_t o, version_t v, int p) {
+       objects.push_back(o); versions.push_back(v); peers.push_back(p);
+  }
+  void push_front(object_t o, version_t v, int p) {
+       objects.push_front(o); versions.push_front(v); peers.push_front(p);
+  }
+  bool get_next(object_t& o, version_t& v, int& p) {
+       if (objects.empty()) return false;
+       o = objects.front(); v = versions.front(); p = peers.front();
+       objects.pop_front(); versions.pop_front(); peers.pop_front();
+       return true;
+  }
+  void clear() {
+       objects.clear(); versions.clear(); peers.clear();
+  }
+  bool empty() { return objects.empty(); }
+};
+
+
+
+/** RG - Replica Group
+ *
+ */
+
+// bits used on any
+#define RG_STATE_COMPLETE    1  // i have full RG contents locally.
+#define RG_STATE_PEERED      2  // i have contacted prior primary and all
+                                // replica osds and/or fetched their 
+                                // content lists, and thus know what's up.
+                                // or, i have check in w/ new primary (on replica)
+
+// on primary or old-primary only
+#define RG_STATE_CLEAN       4  // i am fully replicated
+
+class RG {
+ protected:
+  int whoami;  // osd, purely for debug output, yucka
+
+  repgroup_t rgid;
+  int        role;    // 0 = primary, 1 = secondary, etc.  -1=undef/none.
+  int        state;   // see bit defns above
+
+  int        primary;         // replica: who the primary is (if not me)
+  set<int>   old_replica_set; // old primary: where replicas used to be
+  
+  map<int, RGPeer*>         peers;  // primary: (soft state) active peers
+
+ public:
+  RGQueue                   pull_plan; 
+  RGQueue                   push_plan;
+  RGQueue                   clean_plan;
+
+  list<class Message*>                 waiting_for_peered;   // any op will hang until peered
+  map<object_t, list<class Message*> > waiting_for_object;   // ops waiting for specific objects.
+
+  // recovery
+  map<object_t, version_t>  local_objects;
+  map<object_t, version_t>  objects;          // what the current object set is
+  map<object_t, int>        objects_loc;      // where latest live
+  
+  // for unstable states,
+  map<object_t, version_t>  deleted_objects;  // locally deleted objects
+
+ public:  
+  RG(int osd, repgroup_t r) : whoami(osd), rgid(r),
+       role(0),
+       state(0),
+       primary(-1) { }
+  
+  repgroup_t get_rgid() { return rgid; }
+  int        get_role() { return role; }
+  int        get_primary() { return primary; }
+
+  void       set_role(int r) { role = r; }
+  void       set_primary(int p) { primary = p; }
+
+  bool       is_primary() { return role == 0; }
+  bool       is_residual() { return role < 0; }
+
+  bool       is_pulling() { return !pull_plan.empty(); }
+  void       pulled(object_t oid, version_t v, RGPeer *p);
+  bool       is_pushing() { return !push_plan.empty(); }
+  void       pushed(object_t oid, version_t v, RGPeer *p);
+  bool       is_removing() { return !push_plan.empty(); }
+  void       removed(object_t oid, version_t v, RGPeer *p);
+
+  RGPeer*    get_proxy_peer(object_t o) { 
+       if (objects_loc.count(o))
+         return get_peer(objects_loc[o]);
+       return 0;
+  }
+  version_t  get_proxy_version(object_t o) { return objects[o]; }
+  
+  int  get_state() { return state; }
+  bool state_test(int m) { return (state & m) != 0; }
+  void set_state(int s) { state = s; }
+  void state_set(int m) { state |= m; }
+  void state_clear(int m) { state &= ~m; }
+
+  bool       is_peered() { return state_test(RG_STATE_PEERED); }
+  void       mark_peered();
+  bool       is_complete() { return state_test(RG_STATE_COMPLETE); }
+  void       mark_complete();
+  bool       is_clean() { return state_test(RG_STATE_CLEAN); }
+  void       mark_clean();
+
+  int num_active_ops() {
+       int o = 0;
+       for (map<int, RGPeer*>::iterator it = peers.begin();
+         it != peers.end();
+                it++) 
+         o += it->second->num_active_ops();
+       return o;
+  }
+
+  map<int, RGPeer*>& get_peers() { return peers; }
+  RGPeer* get_peer(int p) {
+       if (peers.count(p)) return peers[p];
+       return 0;
+  }
+  RGPeer* new_peer(int p, int r) {
+       return peers[p] = new RGPeer(this, p, r);
+  }
+  void remove_peer(int p) {
+       assert(peers.count(p));
+       delete peers[p];
+       peers.erase(p);
+  }
+
+  set<int>&                 get_old_replica_set() { return old_replica_set; }
+  map<object_t, version_t>& get_deleted_objects() { return deleted_objects; }
+
+
+  
+  void store(ObjectStore *store) {
+       if (!store->collection_exists(rgid))
+         store->collection_create(rgid);
+       store->collection_setattr(rgid, "role", &role, sizeof(role));
+       store->collection_setattr(rgid, "primary", &primary, sizeof(primary));
+       store->collection_setattr(rgid, "state", &state, sizeof(state));        
+  }
+  void fetch(ObjectStore *store) {
+       store->collection_getattr(rgid, "role", &role, sizeof(role));
+       store->collection_getattr(rgid, "primary", &primary, sizeof(primary));
+       store->collection_getattr(rgid, "state", &state, sizeof(state));        
+  }
+
+  void add_object(ObjectStore *store, object_t oid) {
+       store->collection_add(rgid, oid);
+  }
+  void remove_object(ObjectStore *store, object_t oid) {
+       store->collection_remove(rgid, oid);
+  }
+  void list_objects(ObjectStore *store, list<object_t>& ls) {
+       store->collection_list(rgid, ls);
+  }
+
+  void scan_local_objects(ObjectStore *store) {
+       list<object_t> olist;
+       local_objects.clear();
+       list_objects(store,olist);
+       for (list<object_t>::iterator it = olist.begin();
+                it != olist.end();
+                it++) {
+         version_t v = 0;
+         store->getattr(*it, 
+                                        "version",
+                                        &v, sizeof(v));
+         local_objects[*it] = v;
+       }
+  }
+
+
+  void assim_deleted_objects(map<object_t,version_t>& dl) {
+       for (map<object_t, version_t>::iterator oit = dl.begin();
+                oit != dl.end();
+                oit++) {
+         if (objects.count(oit->first) == 0) continue;  // dne
+         if (objects[oit->first] < oit->second) {       // deleted.
+               objects.erase(oit->first);
+               objects_loc.erase(oit->first);
+         }
+       }
+  }
+
+  void analyze_peers(ObjectStore *store);
+  void plan_push();
+  void plan_cleanup();
+
+};
+