]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
lots of osd replication/recovery bits
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Aug 2005 00:48:28 +0000 (00:48 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Aug 2005 00:48:28 +0000 (00:48 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@478 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
ceph/TODO
ceph/client/Client.cc
ceph/config.cc
ceph/include/types.h
ceph/mds/MDS.cc
ceph/messages/MOSDOp.h
ceph/messages/MOSDOpReply.h
ceph/osd/BDBMap.h
ceph/osd/FakeStore.cc
ceph/osd/FakeStore.h
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osd/ObjectStore.h
ceph/osdc/Filer.cc

index ed7386313354d65b8f939b0f655a96b9f768c5e7..7cd44ee93c0408ee566c6f37fe17b3c70e240890 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,9 +1,32 @@
 
-- test hashed readdir
+I think the OSD stuff breaks down info a few different areas:
+
+- Lay out OSD in-memory data structures.  I've started this based on the stuff I posted to the list.
+
+- The object store needs to be able to store the metadata for objects and replica groups, via the ObjectStore interface.  This boils down to collections and extended attributes (on both objects and collections).  The beginnings of both are are implemented in FakeStore, but untested and unfinished.  
+- OSD::op_write needs to put new objects in a rg collection. 
+
+- Map the actual metadata we're keeping to object and collection xattrs.  Probably via fetch/store methods on the in-memory data structures.
+
+- When an OSD gets a new version of the map it needs to go through the process of checking it's active replica groups, seeing if its role has changed, etc.
+
+- The migration process itself.  Needs to be tunable somehow 
+
+- OSD::op_read needs to do the proxying sort of thing, where if it's is primary but !COMPLETE it needs to wait and fetch the object from the (old) replica it currently resides on, or block until old replicas are scanned.
+
+- OSD::op_write needs to do the replication thing.  I think replication should be implemented last, it's probably easier to add once the role changing stuff already works.
+
+
+
+
+
 - interactive hash/unhash interface
+- test hashed readdir
+- make logstream.flush align itself to stipes
+
 - carefully define/document frozen wrt dir_auth vs hashing
 
-- make logstream.flush align itself to stipes
+
 
 
 
index 753d041c3b52aac361134d64943238d75dfd5409..05280fe4ca472fc3f44f1d7b09f0e7719c92e23b 100644 (file)
@@ -1308,6 +1308,7 @@ int Client::close(fh_t fh)
   int result = 0;
 
   // release caps right away?
+  dout(10) << "num_rd " << in->num_rd << "  num_wr " << in->num_wr << endl;
   if (in->num_rd == 0 &&
          in->num_wr == 0) {
        // synchronously; FIXME this is dumb
@@ -1323,23 +1324,19 @@ int Client::close(fh_t fh)
        req->set_caller_uid(getuid());
        req->set_caller_gid(getgid());
 
+       // release caps locally
+       in->file_caps_seq = 0;
+       in->file_caps = 0;
+       in->file_wr_mtime = 0;
+       in->file_wr_size = 0;
+
+       put_inode(in);
+       
+       // make the call .. FIXME there's no reason this has to block!
        MClientReply *reply = make_request(req, true, mds_auth);
        assert(reply);
        int result = reply->get_result();
        assert(result == 0);
-
-       // success?
-       if (in->file_caps_seq == reply->get_file_caps_seq()) {
-         // yup.
-         dout(5) << "successfully released caps" << endl;
-         in->file_caps_seq = 0;
-         in->file_caps = 0;
-         in->file_wr_mtime = 0;
-         in->file_wr_size = 0;
-         put_inode(in);
-       } else {
-         dout(5) << "failed to release caps; i had " << in->file_caps_seq << " mds had " << reply->get_file_caps_seq() << endl;
-       }
   
        delete reply;
   }
index 39290850d3449691e493a2de8c03b66fe830c7ef..4a226f90cfe08efa2f54e3ffb79e456dcf8f9003 100644 (file)
@@ -74,7 +74,7 @@ md_config_t g_conf = {
   client_bcache_maxfrag: 10, // max actual relative # of bheads over opt rel # of bheads
 
   client_trace: 0,
-  fuse_direct_io: 1,
+  fuse_direct_io: 0,
   
   // --- mds ---
   mds_cache_size: MDS_CACHE_SIZE,
index 2f0bbe640babae9161fb8e2138bde9265c9f067b..1e53404097d45fa844d4b21e42a14d0c21751b12 100644 (file)
@@ -131,7 +131,7 @@ struct inode_t {
 
 
 // osd types
-typedef int        repgroup_t;    // replica group
+typedef __uint64_t repgroup_t;    // replica group
 typedef __uint64_t object_t;      // object id
 typedef __uint64_t coll_t;        // collection id
 
index 6f1fe9dd3083676b728eed0fd9809f7d583c9f2c..9af66421015c7ed3be431dbca5dd3ba45b2dc09b 100644 (file)
@@ -487,7 +487,7 @@ void MDS::my_dispatch(Message *m)
        
 
        // HACK to test hashing stuff
-       if (1) {
+       if (0) {
          static map<int,int> didhash;
          if (elapsed.sec() > 15 && !didhash[whoami]) {
                CInode *in = mdcache->get_inode(100000010);
index 254e01999434c529763a0c1edb69fda075f887c3..7a9b1d4a56b1c0a2a6cb0106e301e347ee68e610 100644 (file)
@@ -9,6 +9,9 @@
  * oid - object id
  * op  - OSD_OP_DELETE, etc.
  *
+ * rg_role  - who we want ... 0 == primary, this is what clients/mds will do.
+ * rg_nrep  - how many replicas we want ... just for writes currently?
+ *
  */
 
 #define OSD_OP_READ       1
@@ -29,7 +32,7 @@ typedef struct {
   object_t oid;
   repgroup_t rg;
   int rg_role, rg_nrep;
-  __uint64_t ocv;
+  __uint64_t map_version;
 
   int op;
   size_t length, offset;
@@ -51,7 +54,7 @@ class MOSDOp : public Message {
   repgroup_t get_rg() { return st.rg; }
   int        get_rg_role() { return st.rg_role; }  // who am i asking for?
   int        get_rg_nrep() { return st.rg_nrep; }
-  __uint64_t get_ocv() { return st.ocv; }
+  __uint64_t get_map_version() { return st.map_version; }
 
   int get_op() { return st.op; }
   size_t get_length() { return st.length; }
@@ -71,7 +74,7 @@ class MOSDOp : public Message {
   long get_pcid() { return st.pcid; }
 
   MOSDOp(long tid, msg_addr_t asker, 
-                object_t oid, repgroup_t rg, __uint64_t ocv, int op) :
+                object_t oid, repgroup_t rg, __uint64_t mapversion, int op) :
        Message(MSG_OSD_OP) {
        memset(&st, 0, sizeof(st));
        this->st.tid = tid;
@@ -80,7 +83,7 @@ class MOSDOp : public Message {
        this->st.oid = oid;
        this->st.rg = rg;
        this->st.rg_role = 0;
-       this->st.ocv = ocv;
+       this->st.map_version = mapversion;
        this->st.op = op;
   }
   MOSDOp() {}
index 8250874556ca9d4d9f081847672634d71b2d5d57..e48919228a10f24f08c4f06aab3e66b979ea3ee2 100644 (file)
@@ -29,7 +29,7 @@ typedef struct {
   size_t length, offset;
   size_t object_size;
 
-  __uint64_t _new_ocv;
+  __uint64_t _new_map_version;
   size_t _data_len, _oc_len;
 } MOSDOpReply_st;
 
@@ -64,7 +64,7 @@ class MOSDOpReply : public Message {
   }
 
   // osdmap
-  __uint64_t get_ocv() { return st._new_ocv; }
+  __uint64_t get_map_version() { return st._new_map_version; }
   bufferlist& get_osdmap() { 
        return osdmap;
   }
@@ -87,9 +87,10 @@ class MOSDOpReply : public Message {
        this->st.offset = req->st.offset;
 
        // attach updated cluster spec?
-       if (req->get_ocv() < oc->get_version()) {
+       if (oc &&
+               req->get_map_version() < oc->get_version()) {
          oc->encode(osdmap);
-         st._new_ocv = oc->get_version();
+         st._new_map_version = oc->get_version();
          st._oc_len = osdmap.length();
        }
   }
index 0f44a6538418c74f6b000e2599c8ceccdac5821c..5af5b5c4772c2d308992d7e84cffbcc7edd41740 100644 (file)
@@ -2,48 +2,70 @@
 #define __BERKELEYDB_H
 
 #include <db.h>
+#include <unistd.h>
+
 #include <list>
 using namespace std;
 
+
 template<typename K, typename D>
 class BDBMap {
  private:
   DB *dbp;
   
  public:
-  BDBMap() {
-       int r;
-       if ((r = db_create(&dbp, NULL, 0)) != 0) {
-         cerr << "db_create: " << db_strerror(r) << endl;
-         assert(0);
-       }
-  }
+  BDBMap() : dbp(0) {}
   ~BDBMap() {
        close();
   }
 
+  bool is_open() { return dbp ? true:false; }
+
   // open/close
   int open(const char *fn) {
-       int r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644);
+       //cout << "open " << fn << endl;
+
+       int r;
+       if ((r = db_create(&dbp, NULL, 0)) != 0) {
+         cerr << "db_create: " << db_strerror(r) << endl;
+         assert(0);
+       }
+
+       dbp->set_errfile(dbp, stderr);
+       dbp->set_errpfx(dbp, "bdbmap");
+
+       r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644);
+       if (r != 0) {
+         dbp->err(dbp, r, "%s", fn);
+       }
        assert(r == 0);
        return 0;
   }
   void close() {
-       dbp->close(dbp,0);
-       dbp = 0;
+       if (dbp) {
+         dbp->close(dbp,0);
+         dbp = 0;
+       }
   }
   void remove(const char *fn) {
-       dbp->remove(dbp, fn, 0, 0);
-       dbp = 0;
+       if (!dbp) open(fn);
+       if (dbp) {
+         dbp->remove(dbp, fn, 0, 0);
+         dbp = 0;
+       } else {
+         ::unlink(fn);
+       }
   }
   
   // accessors
   int put(K key,
                  D data) {
        DBT k;
+       memset(&k, 0, sizeof(k)); 
        k.data = &key;
        k.size = sizeof(K);
        DBT d;
+       memset(&d, 0, sizeof(d));
        d.data = &data;
        d.size = sizeof(data);
        return dbp->put(dbp, NULL, &k, &d, 0);
@@ -52,9 +74,11 @@ class BDBMap {
   int get(K key,
                  D& data) {
        DBT k;
+       memset(&k, 0, sizeof(k)); 
        k.data = &key;
        k.size = sizeof(key);
        DBT d;
+       memset(&d, 0, sizeof(d));
        d.data = &data;
        d.size = sizeof(data);
        int r = dbp->get(dbp, NULL, &k, &d, 0);
@@ -63,6 +87,7 @@ class BDBMap {
 
   int del(K key) {
        DBT k;
+       memset(&k, 0, sizeof(k)); 
        k.data = &key;
        k.size = sizeof(key);
        return dbp->del(dbp, NULL, &k, 0);
@@ -73,21 +98,21 @@ class BDBMap {
        int r = dbp->cursor(dbp, NULL, &cursor, 0);
        assert(r == 0);
 
-       K key;
-       D data;
-
        DBT k,d;
-       k.data = &key;
-       k.size = sizeof(key);
-       d.data = &data;
-       d.size = sizeof(data);
+       memset(&k, 0, sizeof(k));
+       memset(&d, 0, sizeof(d));
 
-       while (1) {
-         int r = cursor->c_get(cursor, &k, &d, DB_NEXT);
-         if (r == DB_NOTFOUND) break;
-         assert(r == 0);
+       while ((r = cursor->c_get(cursor, &k, &d, DB_NEXT)) == 0) {
+         K key;
+         assert(k.size == sizeof(key));
+         memcpy(&key, k.data, k.size);
          ls.push_back(key);
        }
+       if (r != DB_NOTFOUND) {
+         dbp->err(dbp, r, "DBcursor->get");
+         assert(r == DB_NOTFOUND);
+       }
+
        cursor->c_close(cursor);
        return 0;
   }
index 2459b77c8bcf72d163f83a758fe28ebe6e775084..0d7036b10957bb8b12a7d644be9809700f5b8294 100644 (file)
@@ -56,6 +56,10 @@ int FakeStore::init()
 int FakeStore::finalize() 
 {
   dout(5) << "finalize" << endl;
+
+  // close collections db files
+  close_collections();
+
   // nothing
   return 0;
 }
@@ -76,6 +80,12 @@ void FakeStore::get_oname(object_t oid, string& fn) {
   fn = basedir + "/" + s;
   //  dout(1) << "oname is " << fn << endl;
 }
+void FakeStore::get_collfn(coll_t c, string &fn) {
+  char s[100];
+  sprintf(s, "%d/%02llx/%016llx.co", whoami, HASH_FUNC(c), c);
+  fn = basedir + "/" + s;
+}
+
 
 
 void FakeStore::wipe_dir(string mydir)
@@ -107,6 +117,8 @@ int FakeStore::mkfs()
 
   dout(1) << "mkfs in " << mydir << endl;
 
+  close_collections();
+
   // make sure my dir exists
   r = ::stat(mydir.c_str(), &st);
   if (r != 0) {
@@ -138,7 +150,7 @@ int FakeStore::mkfs()
        else
          wipe_dir( subdir );
   }
-
+  
   return r;
 }
 
@@ -250,33 +262,107 @@ int FakeStore::write(object_t oid,
 
 
 
+// ------------------
+// attributes
+
+int FakeStore::setattr(object_t oid, const char *name,
+                                          void *value, size_t size)
+{
+  string fn;
+  get_oname(oid, fn);
+  return setxattr(fn.c_str(), name, value, size, 0);
+}
+
+
+int FakeStore::getattr(object_t oid, const char *name,
+                                          void *value, size_t size)
+{
+  string fn;
+  get_oname(oid, fn);
+  return getxattr(fn.c_str(), name, value, size);
+}
+
+int FakeStore::listattr(object_t oid, char *attrs, size_t size)
+{
+  string fn;
+  get_oname(oid, fn);
+  return listxattr(fn.c_str(), attrs, size);
+}
+
 
 
 
 // ------------------
 // collections
 
-void FakeStore::get_collfn(coll_t c, string &fn) {
-  char s[100];
-  sprintf(s, "collection.%02llx", c);
-  fn = basedir;
-  fn += "/";
-  fn += s;
+// helpers
+
+void FakeStore::open_collections() 
+{
+  string cfn;
+  get_dir(cfn);
+  cfn += "/collections";
+  collections.open(cfn.c_str());  
+  list<coll_t> ls;
+  collections.list_keys(ls);
 }
-void FakeStore::open_collection(coll_t c) {
-  if (collection_map.count(c) == 0) {
-       string fn;
-       get_collfn(c,fn);
-       collection_map[c] = new BDBMap<coll_t,int>;
-       collection_map[c]->open(fn.c_str());
+
+void FakeStore::close_collections()
+{
+  if (collections.is_open())
+       collections.close();
+
+  for (map<coll_t, BDBMap<object_t, int>*>::iterator it = collection_map.begin();
+          it != collection_map.end();
+          it++) {
+       it->second->close();
   }
+  collection_map.clear();
 }
+
+
+int FakeStore::open_collection(coll_t c) {
+  if (collection_map.count(c))
+       return 0;  // already open.
+
+  string fn;
+  get_collfn(c,fn);
+  collection_map[c] = new BDBMap<coll_t,int>;
+  int r = collection_map[c]->open(fn.c_str());
+  if (r != 0)
+       collection_map.erase(c);  // failed
+  return r;
+}
+
+// public
+int FakeStore::list_collections(list<coll_t>& ls)
+{
+  if (!collections.is_open()) open_collections();
+
+  ls.clear();
+  collections.list_keys(ls);
+  return 0;
+}
+
+int FakeStore::collection_stat(coll_t c, struct stat *st) {
+  if (!collections.is_open()) open_collections();
+
+  string fn;
+  get_collfn(c,fn);
+  return ::stat(fn.c_str(), st);
+}
+
 int FakeStore::collection_create(coll_t c) {
+  if (!collections.is_open()) open_collections();
+
   collections.put(c, 1);
   open_collection(c);
   return 0;
 }
+
 int FakeStore::collection_destroy(coll_t c) {
+  if (!collections.is_open()) open_collections();
+
   collections.del(c);
   
   open_collection(c);
@@ -289,48 +375,57 @@ int FakeStore::collection_destroy(coll_t c) {
   collection_map.erase(c);
   return 0;
 }
+
 int FakeStore::collection_add(coll_t c, object_t o) {
+  if (!collections.is_open()) open_collections();
+
   open_collection(c);
   collection_map[c]->put(o,1);
   return 0;
 }
 int FakeStore::collection_remove(coll_t c, object_t o) {
+  if (!collections.is_open()) open_collections();
+
   open_collection(c);
   collection_map[c]->del(o);
   return 0;
 }
 int FakeStore::collection_list(coll_t c, list<object_t>& o) {
+  if (!collections.is_open()) open_collections();
+
   open_collection(c);
   collection_map[c]->list_keys(o);
   return 0;
 }
 
-
-
-// ------------------
-// attributes
-
-int FakeStore::setattr(object_t oid, const char *name,
-                                          void *value, size_t size)
+int FakeStore::collection_setattr(coll_t cid, const char *name,
+                                                                 void *value, size_t size)
 {
+  if (!collections.is_open()) open_collections();
+
   string fn;
-  get_oname(oid, fn);
+  get_collfn(cid,fn);
   return setxattr(fn.c_str(), name, value, size, 0);
 }
 
 
-int FakeStore::getattr(object_t oid, const char *name,
+int FakeStore::collection_getattr(coll_t cid, const char *name,
                                           void *value, size_t size)
 {
+  if (!collections.is_open()) open_collections();
+
   string fn;
-  get_oname(oid, fn);
+  get_collfn(cid,fn);
   return getxattr(fn.c_str(), name, value, size);
 }
 
-int FakeStore::listattr(object_t oid, char *attrs, size_t size)
+int FakeStore::collection_listattr(coll_t cid, char *attrs, size_t size)
 {
+  if (!collections.is_open()) open_collections();
+
   string fn;
-  get_oname(oid, fn);
+  get_collfn(cid, fn);
   return listxattr(fn.c_str(), attrs, size);
 }
 
+
index 133e96ef4e0d74781e4b5b66e0373301467893ae..4e5594c073c0a2143c153325d164c050b2b67907 100644 (file)
@@ -39,6 +39,12 @@ class FakeStore : public ObjectStore {
                        char *buffer,
                        bool fsync);
 
+  int setattr(object_t oid, const char *name,
+                               void *value, size_t size);
+  int getattr(object_t oid, const char *name,
+                         void *value, size_t size);
+  int listattr(object_t oid, char *attrs, size_t size);
+
 
   // -------------------
   // collections
@@ -49,24 +55,26 @@ class FakeStore : public ObjectStore {
   map<coll_t, BDBMap<object_t, int>*> collection_map;
 
   void get_collfn(coll_t c, string &fn);
-  void open_collection(coll_t c);
+  int open_collection(coll_t c);
+
+  void open_collections();
+  void close_collections();
 
  public:
+  int list_collections(list<coll_t>& ls);
+  int collection_stat(coll_t c, struct stat *st);
   int collection_create(coll_t c);
   int collection_destroy(coll_t c);
   int collection_add(coll_t c, object_t o);
   int collection_remove(coll_t c, object_t o);
   int collection_list(coll_t c, list<object_t>& o);
 
-
-  // -------------------
-  // attributes
+  int collection_setattr(coll_t c, const char *name,
+                                                void *value, size_t size);
+  int collection_getattr(coll_t c, const char *name,
+                                                void *value, size_t size);
+  int collection_listattr(coll_t c, char *attrs, size_t size);
   
-  int setattr(object_t oid, const char *name,
-                               void *value, size_t size);
-  int getattr(object_t oid, const char *name,
-                         void *value, size_t size);
-  int listattr(object_t oid, char *attrs, size_t size);
 
 };
 
index a45711dc664a065e02d86936f2860608e2157cef..069043ec2608638c17f6c5fd03846605b9a72a12 100644 (file)
@@ -143,6 +143,51 @@ int OSD::shutdown()
 
 
 
+// ------------------------------------
+// replica groups
+
+void OSD::get_rg_list(list<repgroup_t>& ls)
+{
+  // just list collections; assume they're all rg's (for now)
+  store->list_collections(ls);
+}
+
+
+bool OSD::rg_exists(repgroup_t rg) 
+{
+  struct stat st;
+  if (store->collection_stat(rg, &st) == 0) 
+       return true;
+  else
+       return false;
+}
+
+
+RG *OSD::open_rg(repgroup_t rg)
+{
+  // already open?
+  if (rg_map.count(rg)) 
+       return rg_map[rg];
+
+  // stat collection
+  RG *r = new RG(rg);
+  if (rg_exists(rg)) {
+       // exists
+       r->fetch(store);
+  } else {
+       // dne
+       r->store(store);
+  }
+  rg_map[rg] = r;
+
+  return r;
+}
+
+
+
+
+// --------------------------------------
 // dispatch
 
 void OSD::dispatch(Message *m) 
@@ -180,7 +225,7 @@ void OSD::dispatch(Message *m)
        break;
 
 
-       // for replication..
+       // for replication etc.
   case MSG_OSD_OPREPLY:
        monitor->host_is_alive(m->get_source());
        handle_op_reply((MOSDOpReply*)m);
@@ -224,108 +269,197 @@ void OSD::handle_ping(MPing *m)
 }
 
 
+
+void OSD::update_map(bufferlist& state)
+{
+  // decode new map
+  if (!osdmap) osdmap = new OSDMap();
+  osdmap->decode(state);
+  dout(7) << "update_map version " << osdmap->get_version() << endl;
+
+  // scan replica groups
+  list<repgroup_t> ls;
+  get_rg_list(ls);
+  
+  map< int, list<RG*> > primary_ping_queue;
+
+  for (list<repgroup_t>::iterator it = ls.begin();
+          it != ls.end();
+          it++) {
+       repgroup_t rgid = *it;
+       RG *rg = open_rg(rgid);
+       assert(rg);
+
+       // get active rush mapping
+       int acting[NUM_RUSH_REPLICAS];
+       int nrep = osdmap->repgroup_to_acting_osds(rgid, acting, NUM_RUSH_REPLICAS);
+       int primary = acting[0];
+       int role = -1;
+       for (int i=0; i<nrep; i++) 
+         if (acting[i] == whoami) role = i;
+       
+
+       if (role != rg->get_role()) {
+         // role change.
+         dout(7) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; 
+         
+         // am i old-primary?
+         if (rg->get_role() == 0) {
+               // note potential replica set, and drop old peering sessions.
+               for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
+                        it != rg->get_peers().end();
+                        it++) {
+                 dout(7) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+                 rg->get_old_replica_set().insert(it->first);
+                 delete it->second;
+               }
+               rg->get_peers().clear();
+         }
+
+         // we need to re-peer
+         rg->state_clear(RG_STATE_PEERED);
+         rg->set_role(role);
+         rg->store(store);
+         primary_ping_queue[primary].push_back(rg);
+         
+       } else {
+         // no role change.
+
+         if (role > 0) {  
+               // i am replica.
+               
+               // did primary change?
+               if (primary != rg->get_primary()) {
+                 dout(7) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+                 
+                 // re-peer
+                 rg->state_clear(RG_STATE_PEERED);
+                 rg->set_primary(primary);
+                 rg->store(store);
+                 primary_ping_queue[primary].push_back(rg);
+               }
+         }
+         else if (role == 0) {
+               // i am primary.
+
+               // check replicas
+               for (int r=1; r<nrep; r++) {
+                 if (rg->get_peer(r) == 0) {
+                       dout(7) << " rg " << rgid << " primary not peered with replica " << r << " osd" << acting[r] << endl;
+                       
+                       // ***
+                 } 
+               }
+
+         }
+       }
+  }
+  
+
+  // initiate any new peering sessions!
+  for (map< int, list<RG*> >::iterator pit = primary_ping_queue.begin();
+          pit != primary_ping_queue.end();
+          pit++) {
+       // create peer message
+       int primary = pit->first;
+       
+
+       for (list<RG*>::iterator rit = pit->second.begin();
+                rit != pit->second.end();
+                rit++) {
+         // add this RG to peer message
+       }
+
+       // send
+       
+  }
+
+}
+
+
 void OSD::handle_getmap_ack(MOSDGetMapAck *m)
 {
   // SAB
   osd_lock.Lock();
 
-  if (!osdmap) osdmap = new OSDMap();
-  osdmap->decode(m->get_osdmap());
-  dout(7) << "got OSDMap version " << osdmap->get_version() << endl;
+  update_map(m->get_osdmap());
   delete m;
 
   // process waiters
   list<MOSDOp*> waiting;
   waiting.splice(waiting.begin(), waiting_for_osdmap);
 
-  for (list<MOSDOp*>::iterator it = waiting.begin();
-          it != waiting.end();
+  list<MOSDOp*> w = waiting;
+
+  osd_lock.Unlock();
+
+  for (list<MOSDOp*>::iterator it = w.begin();
+          it != w.end();
           it++) {
        handle_op(*it);
   }
-
-  // SAB
-  osd_lock.Unlock();
 }
 
 void OSD::handle_op(MOSDOp *op)
 {
-  // starting up?
+  // mkfs is special
+  if (op->get_op() == OSD_OP_MKFS) {
+       op_mkfs(op);
+       return;
+  }
 
+  // no map?  starting up?
   if (!osdmap) {
-    // SAB
     osd_lock.Lock();
-
        dout(7) << "no OSDMap, starting up" << endl;
        if (waiting_for_osdmap.empty()) 
          messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
                                                          MSG_ADDR_MDS(0), MDS_PORT_MAIN);
        waiting_for_osdmap.push_back(op);
-
-       // SAB
        osd_lock.Unlock();
-
        return;
   }
   
-
-  // check cluster version
-  if (op->get_ocv() > osdmap->get_version()) {
+  // is our map version up to date?
+  if (op->get_map_version() > osdmap->get_version()) {
        // op's is newer
-       dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
+       dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
        
        // query MDS
        dout(7) << "querying MDS" << endl;
        messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
                                                        MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+
        assert(0);
 
-       // SAB
        osd_lock.Lock();
-
        waiting_for_osdmap.push_back(op);
-
-       // SAB
        osd_lock.Unlock();
-
        return;
   }
 
-  if (op->get_ocv() < osdmap->get_version()) {
+  // does user have old map?
+  if (op->get_map_version() < osdmap->get_version()) {
        // op's is old
-       dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
+       dout(7) << "op map " << op->get_map_version() << " < " << osdmap->get_version() << endl;
   }
 
 
-
-  // am i the right rg_role?
-  if (0) {
+  // did this op go to the right OSD?
+  if (op->get_rg_role() == 0) {
     repgroup_t rg = op->get_rg();
-    if (op->get_rg_role() == 0) {
-      // PRIMARY
+       int acting_primary = osdmap->get_rg_acting_primary( rg );
        
-      // verify that we are primary, or acting primary
-      int acting_primary = osdmap->get_rg_acting_primary( op->get_rg() );
-      if (acting_primary != whoami) {
-       dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
-       messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
-       logger->inc("fwd");
-       return;
-      }
-    } else {
-      // REPLICA
-      int my_role = osdmap->get_rg_role(rg, whoami);
-      
-      dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
-      
-      if (my_role != op->get_rg_role()) {
-       assert(0); 
-      }
-    }
+       if (acting_primary != whoami) {
+         dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+         messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+         logger->inc("fwd");
+         return;
+       }
   }
 
+  // queue op
   queue_op(op);
-  // do_op(op);
 }
 
 void OSD::queue_op(MOSDOp *op) {
@@ -449,6 +583,9 @@ void OSD::op_write(MOSDOp *op)
   bool write_sync = op->get_rg_role() == 0;  // primary writes synchronously, replicas don't.
 
   
+  // new object?
+  bool existed = store->exists(op->get_oid());
+
   // take buffers from the message
   bufferlist bl;
   bl.claim( op->get_data() );
@@ -470,14 +607,12 @@ void OSD::op_write(MOSDOp *op)
        }
   }
 
-  // trucnate after?
-  /*
-  if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) {
-       size_t at = m->get_offset() + m->get_length();
-       int r = store->truncate(m->get_oid(), at);
-       dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl;
+  // update object metadata
+  if (!existed) {
+       // add to RG collection
+       RG *r = open_rg(op->get_rg());
+       r->add_object(store, op->get_oid());
   }
-  */
 
   logger->inc("wr");
   logger->inc("wrb", op->get_length());
index b525ff6e78a4603527759e6d4099c2a48c8ca2e7..55bb12ba07ecb74c9b088f8b7477769376a9ecc7 100644 (file)
@@ -7,8 +7,12 @@
 #include "common/Mutex.h"
 #include "common/ThreadPool.h"
 
+#include "ObjectStore.h"
+
 #include <map>
 using namespace std;
+#include <ext/hash_map>
+using namespace __gnu_cxx;
 
 
 class Messenger;
@@ -80,24 +84,66 @@ struct RGPeer {
 #define RG_STATE_CLEAN       4  // i am fully replicated
 
 class RG {
- public:
+ protected:
   repgroup_t rg;
   int        role;    // 0 = primary, 1 = secondary, etc.  -1=undef/none.
-  int        state;   
+  int        state;   // see bit defns above
+
+  int        primary;     // replica: who the primary is (if not me)
+  set<int>   old_replica_set; // old primary: where replicas used to be
   
-  map<int, RGPeer>          peers;
+  map<int, RGPeer*>         peers;  // primary: (soft state) active peers
 
   // for unstable states,
-  map<object_t, version_t>  deleted_objects;  // locally
+  map<object_t, version_t>  deleted_objects;  // locally deleted objects
 
  public:  
-  RG(repgroup_t rg);
+  RG(repgroup_t r) : rg(r),
+       role(0),
+       state(0) { }
   
   repgroup_t get_rg() { return rg; }
   int        get_role() { return role; }
-  int        get_state() { return state; }
+  int        get_primary() { return primary; }
+
+  void       set_role(int r) { role = r; }
+  void       set_primary(int p) { primary = p; }
+
+  map<int, RGPeer*>& get_peers() { return peers; }
+  RGPeer* get_peer(int p) {
+       if (peers.count(p)) return peers[p];
+       return 0;
+  }
+  set<int>   get_old_replica_set() { return old_replica_set; }
+
+  int  get_state() { return state; }
+  bool state_test(int m) { return (state & m) != 0; }
+  void set_state(int s) { state = s; }
+  void state_set(int m) { state |= m; }
+  void state_clear(int m) { state &= ~m; }
   
-  void enumerate_objects(list<object_t>& ls);
+  void store(ObjectStore *store) {
+       if (!store->collection_exists(rg))
+         store->collection_create(rg);
+       store->collection_setattr(rg, "role", &role, sizeof(role));
+       store->collection_setattr(rg, "primary", &primary, sizeof(primary));
+       store->collection_setattr(rg, "state", &state, sizeof(state));  
+  }
+  void fetch(ObjectStore *store) {
+       store->collection_getattr(rg, "role", &role, sizeof(role));
+       store->collection_getattr(rg, "primary", &primary, sizeof(primary));
+       store->collection_getattr(rg, "state", &state, sizeof(state));  
+  }
+
+  void add_object(ObjectStore *store, object_t oid) {
+       store->collection_add(rg, oid);
+  }
+  void remove_object(ObjectStore *store, object_t oid) {
+       store->collection_remove(rg, oid);
+  }
+  void list_objects(ObjectStore *store, list<object_t>& ls) {
+       store->collection_list(rg, ls);
+  }
 };
 
 
@@ -111,6 +157,14 @@ class Onode {
   map<int, version_t> stray_replicas;   // osds w/ stray replicas.
 
  public:
+  Onode(object_t o) : oid(o), version(0) { }
+
+  void store(ObjectStore *store) {
+       
+  }
+  void fetch(ObjectStore *store) {
+
+  }
 
 };
 
@@ -139,6 +193,17 @@ class OSD : public Dispatcher {
   Mutex osd_lock;
 
 
+  void update_map(bufferlist& state);
+
+  // rg's
+  hash_map<repgroup_t, RG*>      rg_map;
+
+  void get_rg_list(list<repgroup_t>& ls);
+  bool rg_exists(repgroup_t rg);
+  RG *open_rg(repgroup_t rg);            // return RG, load state from store (if needed)
+  void close_rg(repgroup_t rg);          // close in-memory state
+  void remove_rg(repgroup_t rg);         // remove state from store
+
  public:
   OSD(int id, Messenger *m);
   ~OSD();
@@ -147,16 +212,13 @@ class OSD : public Dispatcher {
   int init();
   int shutdown();
 
-  // OSDMap
-  void update_osd_map(__uint64_t ocv, bufferlist& blist);
-
+  // ops
   void queue_op(class MOSDOp *m);
   void do_op(class MOSDOp *m);
   static void doop(OSD *o, MOSDOp *op) {
       o->do_op(op);
     };
 
-
   // messages
   virtual void dispatch(Message *m);
 
index 858a2f92e21958d2a86cb17468c2d8ad298c2120..583822d1ce2c87398814191d3e0f39e48903a960 100644 (file)
@@ -181,16 +181,48 @@ class OSDMap {
 
 
   /* map (repgroup) to a list of osds.  
-        this is where we (will eventually) use RUSH. */
+        this is where we invoke RUSH. */
   int repgroup_to_osds(repgroup_t rg,
                                           int *osds,         // list of osd addr's
                                           int num_rep) {     // num replicas we want
        // get rush list
        assert(rush);
        rush->GetServersByKey( rg, num_rep, osds );
-       return 0;
+       return num_rep;
   }
 
+  int repgroup_to_nonfailed_osds(repgroup_t rg,
+                                                                int *osds,         // list of osd addr's
+                                                                int num_rep) {     // num replicas we want
+       // get rush list
+       assert(rush);
+       int group[NUM_RUSH_REPLICAS];
+       rush->GetServersByKey( rg, NUM_RUSH_REPLICAS, group );
+       int o = 0;
+       for (int i=0; i<NUM_RUSH_REPLICAS && o<num_rep; i++) {
+         if (failed_osds.count(group[i])) continue;
+         osds[o++] = group[i];
+       }
+       return o;
+  }
+
+  int repgroup_to_acting_osds(repgroup_t rg,
+                                                         int *osds,         // list of osd addr's
+                                                         int num_rep) {     // num replicas we want
+       // get rush list
+       assert(rush);
+       int group[NUM_RUSH_REPLICAS];
+       rush->GetServersByKey( rg, NUM_RUSH_REPLICAS, group );
+       int o = 0;
+       for (int i=0; i<NUM_RUSH_REPLICAS && o<num_rep; i++) {
+         if (failed_osds.count(group[i])) continue;
+         if (down_osds.count(group[i])) continue;
+         osds[o++] = group[i];
+       }
+       return o;
+  }
+
+
 
   /* map (ino, ono) to an object name
         (to be used on any osd in the proper replica group) */
@@ -206,36 +238,38 @@ class OSDMap {
 
   /* map rg to the primary osd */
   int get_rg_primary(repgroup_t rg) {
-       int group[NUM_RUSH_REPLICAS];
-       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
-       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
-         if (failed_osds.count(group[i])) continue;
-         return group[i];
-       }
-       assert(0);
-       return -1;  // we fail!
-
+       int group[1];
+       int nrep = repgroup_to_nonfailed_osds(rg, group, 1);
+       assert(nrep > 0);   // we fail!
+       return group[0];
   }
   /* map rg to the _acting_ primary osd (primary may be down) */
   int get_rg_acting_primary(repgroup_t rg) {
-       int group[NUM_RUSH_REPLICAS];
-       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
-       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
-         if (down_osds.count(group[i])) continue;
-         if (failed_osds.count(group[i])) continue;
-         return group[i];
-       }
-       assert(0);
-       return -1;  // we fail!
+       int group[1];
+       int nrep = repgroup_to_acting_osds(rg, group, 1);
+       assert(nrep > 0);  // we fail!
+       return group[0];
   }
 
   /* what replica # is a given osd? 0 primary, -1 for none. */
   int get_rg_role(repgroup_t rg, int osd) {
        int group[NUM_RUSH_REPLICAS];
-       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+       int nrep = repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+       int role = 0;
+       for (int i=0; i<nrep; i++) {
+         if (failed_osds.count(group[i])) continue;
+         if (group[i] == osd) return role;
+         role++;
+       }
+       return -1;  // none
+  }
+  int get_rg_acting_role(repgroup_t rg, int osd) {
+       int group[NUM_RUSH_REPLICAS];
+       int nrep = repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
        int role = 0;
-       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+       for (int i=0; i<nrep; i++) {
          if (failed_osds.count(group[i])) continue;
+         if (down_osds.count(group[i])) continue;
          if (group[i] == osd) return role;
          role++;
        }
index cffc941951ff3f5c093b343d951f33af034cb5dd..7af46cd9b1196924e0efcb021e2565b1d021637b 100644 (file)
@@ -3,6 +3,8 @@
 
 #include "include/types.h"
 
+#include <sys/stat.h>
+
 #include <list>
 using namespace std;
 
@@ -34,21 +36,32 @@ class ObjectStore {
                                        size_t len, off_t offset,
                                        char *buffer,
                                        bool fsync=true) = 0;
-
+  
+  virtual int setattr(object_t oid, const char *name,
+                                         void *value, size_t size) {return 0;} //= 0;
+  virtual int getattr(object_t oid, const char *name,
+                                         void *value, size_t size) {return 0;} //= 0;
+  virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+  
   // collections
+  virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
+  virtual bool collection_exists(coll_t c) {
+       struct stat st;
+       return collection_stat(c, &st) == 0;
+  }
+  virtual int collection_stat(coll_t c, struct stat *st) {return 0;}//= 0;
   virtual int collection_create(coll_t c) {return 0;}//= 0;
   virtual int collection_destroy(coll_t c) {return 0;}//= 0;
   virtual int collection_add(coll_t c, object_t o) {return 0;}//= 0;
   virtual int collection_remove(coll_t c, object_t o) {return 0;}// = 0;
   virtual int collection_list(coll_t c, list<object_t>& o) {return 0;}//= 0;
 
-  // attributes
-  virtual int setattr(object_t oid, const char *name,
-                                         void *value, size_t size) {return 0;} //= 0;
-  virtual int getattr(object_t oid, const char *name,
-                                         void *value, size_t size) {return 0;} //= 0;
-  virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
-
+  virtual int collection_setattr(object_t oid, const char *name,
+                                                                void *value, size_t size) {return 0;} //= 0;
+  virtual int collection_getattr(object_t oid, const char *name,
+                                                                void *value, size_t size) {return 0;} //= 0;
+  virtual int collection_listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+  
 };
 
 #endif
index f1cb5318c3b764783eca2ba1bb51a2ca0f28e36e..f52f4a72c7c58d5ff683ae0cf3416710eafcfa85 100644 (file)
@@ -357,9 +357,9 @@ void
 Filer::handle_osd_op_reply(MOSDOpReply *m)
 {
   // updated cluster info?
-  if (m->get_ocv() && 
-         m->get_ocv() > osdmap->get_version()) {
-       dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdmap->get_version() << endl;
+  if (m->get_map_version() && 
+         m->get_map_version() > osdmap->get_version()) {
+       dout(3) << "op reply has newer map " << m->get_map_version() << " > " << osdmap->get_version() << endl;
        osdmap->decode( m->get_osdmap() );
   }