]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Aug 2005 06:10:32 +0000 (06:10 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Aug 2005 06:10:32 +0000 (06:10 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@479 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
ceph/client/Client.cc
ceph/config.cc
ceph/config.h
ceph/include/bufferlist.h
ceph/mds/MDS.cc
ceph/mds/MDS.h
ceph/messages/MOSDMap.h
ceph/messages/MOSDRGNotify.h [new file with mode: 0644]
ceph/msg/Message.h
ceph/msg/Messenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osdc/Filer.cc
ceph/osdc/Filer.h

index 05280fe4ca472fc3f44f1d7b09f0e7719c92e23b..2176342955013c41ac1d98c0dcf1c5e26d32d6e0 100644 (file)
@@ -425,12 +425,16 @@ void Client::dispatch(Message *m)
   case MSG_OSD_OPREPLY:
        filer->handle_osd_op_reply((MOSDOpReply*)m);
        break;
+  case MSG_OSD_MAP:
+       filer->handle_osd_map((MOSDMap*)m);
+       break;
        
        // client
   case MSG_CLIENT_FILECAPS:
        handle_file_caps((MClientFileCaps*)m);
        break;
 
+
   default:
        cout << "dispatch doesn't recognize message type " << m->get_type() << endl;
        assert(0);  // fail loudly
@@ -629,6 +633,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
 
 
 
+
 // -------------------
 // fs ops
 
index 4a226f90cfe08efa2f54e3ffb79e456dcf8f9003..7912571ff9f78bad4f2cbf0b4eeda05ef3c59b63 100644 (file)
@@ -101,10 +101,12 @@ md_config_t g_conf = {
 
 
   // --- osd ---
+  osd_num_rg: 10000,
   osd_nrep: 1,
   osd_fsync: true,
   osd_writesync: false,
   osd_maxthreads: 10,
+  
 
 
   // --- fakeclient (mds regression testing) ---
@@ -238,6 +240,8 @@ void parse_config_options(int argc, char **argv,
          g_conf.client_bcache_ttl = atoi(argv[++i]);
 
 
+       else if (strcmp(argv[i], "--osd_num_rg") == 0) 
+         g_conf.osd_num_rg = atoi(argv[++i]);
        else if (strcmp(argv[i], "--osd_nrep") == 0) 
          g_conf.osd_nrep = atoi(argv[++i]);
        else if (strcmp(argv[i], "--osd_fsync") == 0) 
index 086f5f1d3a4ae208a9e39a719a3671375b7d0a93..34fabea3c0dc0addfd1f7dfec5c9caf3eaa0cf2c 100644 (file)
@@ -72,6 +72,7 @@ struct md_config_t {
   bool  mds_verify_export_dirauth;     // debug flag
 
   // osd
+  int   osd_num_rg;
   int   osd_nrep;
   bool  osd_fsync;
   bool  osd_writesync;
index 62cedce69bec2dca0e080c4a266ea67d3ef25452..3ffec68eefd7195eeb8778b781d3678b4fff0217 100644 (file)
@@ -466,6 +466,35 @@ inline void _decode(vector<int>& s, bufferlist& bl, int& off)
   assert(s.size() == (unsigned)n);
 }
 
+// list<__uint64_t>
+inline void _encode(list<__uint64_t>& s, bufferlist& bl)
+{
+  int n = s.size();
+  bl.append((char*)&n, sizeof(n));
+  for (list<__uint64_t>::iterator it = s.begin();
+          it != s.end();
+          it++) {
+       __uint64_t v = *it;
+       bl.append((char*)&v, sizeof(v));
+       n--;
+  }
+  assert(n==0);
+}
+inline void _decode(list<__uint64_t>& s, bufferlist& bl, int& off) 
+{
+  s.clear();
+  int n;
+  bl.copy(off, sizeof(n), (char*)&n);
+  off += sizeof(n);
+  for (int i=0; i<n; i++) {
+       __uint64_t v;
+       bl.copy(off, sizeof(v), (char*)&v);
+       off += sizeof(v);
+       s.push_back(v);
+  }
+  assert(s.size() == (unsigned)n);
+}
+
 
 
 
index 9af66421015c7ed3be431dbca5dd3ba45b2dc09b..8a797d2566af564c22469c94f8629c753f9de3f8 100644 (file)
@@ -26,7 +26,7 @@
 #include "messages/MPingAck.h"
 #include "messages/MGenericMessage.h"
 
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
 
 #include "messages/MClientMount.h"
 #include "messages/MClientMountAck.h"
@@ -266,6 +266,37 @@ int MDS::shutdown_final()
 }
 
 
+void MDS::bcast_osd_map()
+{
+  dout(1) << "bcast_osd_map version " << osdmap->get_version() << endl;
+  assert(get_nodeid() == 0);
+
+  // tell mds
+  for (int i=0; i<get_cluster()->get_num_mds(); i++) {
+       messenger->send_message(new MOSDMap(osdmap),
+                                                       MSG_ADDR_MDS(i));
+  }
+  
+  // tell osds
+  set<int> osds;
+  osdmap->get_all_osds(osds);
+  for (set<int>::iterator it = osds.begin();
+          it != osds.end();
+          it++) {
+       messenger->send_message(new MOSDMap(osdmap),
+                                                       MSG_ADDR_OSD(*it));
+  }
+  
+  // tell clients
+  for (set<int>::iterator it = mounted_clients.begin();
+          it != mounted_clients.end();
+          it++) {
+       messenger->send_message(new MOSDMap(osdmap),
+                                                       MSG_ADDR_CLIENT(*it));
+  }
+}
+
+
 
 mds_load_t MDS::get_load()
 {
@@ -299,6 +330,9 @@ void MDS::proc_message(Message *m)
   case MSG_OSD_OPREPLY:
        filer->handle_osd_op_reply((class MOSDOpReply*)m);
        return;
+  case MSG_OSD_MAP:
+       handle_osd_map((MOSDMap*)m);
+       return;
 
   case MSG_OSD_GETMAP:
        handle_osd_getmap(m);
@@ -507,6 +541,24 @@ void MDS::my_dispatch(Message *m)
          }
        }
 
+
+       // HACK osd map change
+       if (0) {
+         static int didit = 0;
+         if (whoami == 0 && 
+                 elapsed.sec() > 10 && !didit) {
+               didit = 1;
+
+               dout(1) << "changing OSD map, removing one OSD" << endl;
+               osdmap->get_group(0).num_osds--;
+               osdmap->init_rush();
+               osdmap->inc_version();
+               
+               // bcast
+               bcast_osd_map();
+         }       
+       }
+
   }
 
   // HACK to force export to test foreign renames
@@ -544,12 +596,33 @@ void MDS::handle_osd_getmap(Message *m)
 {
   dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl;
   
-  messenger->send_message(new MOSDGetMapAck(osdmap),
+  messenger->send_message(new MOSDMap(osdmap),
                                                  m->get_source());
   delete m;
 }
 
 
+void MDS::handle_osd_map(MOSDMap *m)
+{
+  if (!osdmap ||
+         m->get_version() > osdmap->get_version()) {
+       if (osdmap) {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+       } else {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+       }
+       
+       osdmap->decode(m->get_osdmap());
+       
+       // kick requests who might be timing out on the wrong osds
+       // ** FIXME **
+       
+  } else {
+       dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+  }
+}
+
+
 void MDS::handle_client_mount(MClientMount *m)
 {
   // mkfs?  (sorta hack!)
index bc859ed8a170783db25de6a0b53c3e6da373c765..8b28c2d2b187d89197b97eb3dac26744dd5c32be 100644 (file)
@@ -160,6 +160,8 @@ class MDS : public Dispatcher {
   int shutdown_start();
   int shutdown_final();
 
+  void bcast_osd_map();
+
   // messages
   void proc_message(Message *m);
   virtual void dispatch(Message *m);
@@ -184,6 +186,7 @@ class MDS : public Dispatcher {
 
   // osds
   void handle_osd_getmap(Message *m);
+  void handle_osd_map(class MOSDMap *m);
 
   // clients
   void handle_client_mount(class MClientMount *m);
index c47ba8e8069ce17c9de0492aa72c7bd27f1cc4e1..114f95de5d03e68d768ecff317413ead7d1b4aac 100644 (file)
@@ -5,8 +5,9 @@
 #include "osd/OSDMap.h"
 
 
-class MOSDGetMapAck : public Message {
+class MOSDMap : public Message {
   bufferlist osdmap;
+  __uint64_t version;
 
  public:
   // osdmap
@@ -14,19 +15,25 @@ class MOSDGetMapAck : public Message {
        return osdmap;
   }
 
-  MOSDGetMapAck(OSDMap *oc) :
-       Message(MSG_OSD_GETMAPACK) {
+  __uint64_t get_version() { return version; }
+
+  MOSDMap(OSDMap *oc) :
+       Message(MSG_OSD_MAP) {
        oc->encode(osdmap);
+       version = oc->get_version();
   }
-  MOSDGetMapAck() {}
+  MOSDMap() {}
 
 
   // marshalling
   virtual void decode_payload() {
+       payload.copy(0, sizeof(version), (char*)&version);
+       payload.splice(0, sizeof(version));
        osdmap.claim(payload);
   }
   virtual void encode_payload() {
-       payload.claim(osdmap);
+       payload.append((char*)&version, sizeof(version));
+       payload.claim_append(osdmap);
   }
 
   virtual char *get_type_name() { return "ogma"; }
diff --git a/ceph/messages/MOSDRGNotify.h b/ceph/messages/MOSDRGNotify.h
new file mode 100644 (file)
index 0000000..588c950
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef __MOSDPEER_H
+#define __MOSDPEER_H
+
+#include "msg/Message.h"
+
+
+class MOSDRGNotify : public Message {
+  __uint64_t       map_version;
+  list<repgroup_t> rg_list;
+
+ public:
+  __uint64_t get_version() { return map_version; }
+  list<repgroup_t>& get_rg_list() { return rg_list; }
+
+  MOSDRGNotify() {}
+  MOSDRGNotify(__uint64_t v, list<repgroup_t>& l) :
+       Message(MSG_OSD_RG_NOTIFY) {
+       this->map_version = v;
+       rg_list.splice(rg_list.begin(), l);
+  }
+  
+  char *get_type_name() { return "RGnot"; }
+
+  void encode_payload() {
+       payload.append((char*)&map_version, sizeof(map_version));
+       _encode(rg_list, payload);
+  }
+  void decode_payload() {
+       int off = 0;
+       payload.copy(off, sizeof(map_version), (char*)&map_version);
+       _decode(rg_list, payload, off);
+  }
+};
+
+#endif
index a08592cea045a206a96ba60cb8303b8aee49900a..1e8ac4251884afde849275ef2779f93d399a8df1 100644 (file)
 #define MSG_OSD_PING         16
 
 #define MSG_OSD_GETMAP       17
-#define MSG_OSD_GETMAPACK    18
+#define MSG_OSD_MAP          18
+
+#define MSG_OSD_RG_NOTIFY    50
+#define MSG_OSD_RG_PEER      51
+#define MSG_OSD_RG_PEERACK   52
+
 
 #define MSG_CLIENT_REQUEST         20
 #define MSG_CLIENT_REPLY           21
index 6a687802e9135e23c2fea28d917a95439be69f00..735cbdfe46a62b80083c133446916c2b1f6cc41e 100644 (file)
@@ -20,7 +20,8 @@ using namespace std;
 #include "messages/MOSDPing.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
+#include "messages/MOSDRGNotify.h"
 
 #include "messages/MClientMount.h"
 #include "messages/MClientMountAck.h"
@@ -218,8 +219,11 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_OSD_OPREPLY:
        m = new MOSDOpReply();
        break;
-  case MSG_OSD_GETMAPACK:
-       m = new MOSDGetMapAck();
+  case MSG_OSD_MAP:
+       m = new MOSDMap();
+       break;
+  case MSG_OSD_RG_NOTIFY:
+       m = new MOSDRGNotify();
        break;
 
        // clients
index 069043ec2608638c17f6c5fd03846605b9a72a12..77c2ef184433ab7ccec752806db68172e5885c29 100644 (file)
@@ -23,7 +23,8 @@
 #include "messages/MPingAck.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
+#include "messages/MOSDRGNotify.h"
 
 #include "common/Logger.h"
 #include "common/LogType.h"
@@ -143,46 +144,6 @@ int OSD::shutdown()
 
 
 
-// ------------------------------------
-// replica groups
-
-void OSD::get_rg_list(list<repgroup_t>& ls)
-{
-  // just list collections; assume they're all rg's (for now)
-  store->list_collections(ls);
-}
-
-
-bool OSD::rg_exists(repgroup_t rg) 
-{
-  struct stat st;
-  if (store->collection_stat(rg, &st) == 0) 
-       return true;
-  else
-       return false;
-}
-
-
-RG *OSD::open_rg(repgroup_t rg)
-{
-  // already open?
-  if (rg_map.count(rg)) 
-       return rg_map[rg];
-
-  // stat collection
-  RG *r = new RG(rg);
-  if (rg_exists(rg)) {
-       // exists
-       r->fetch(store);
-  } else {
-       // dne
-       r->store(store);
-  }
-  rg_map[rg] = r;
-
-  return r;
-}
 
 
 
@@ -202,6 +163,14 @@ void OSD::dispatch(Message *m)
        monitor->proc_message(m);
        break;
   
+       // map and replication
+  case MSG_OSD_MAP:
+       handle_osd_map((MOSDMap*)m);
+       break;
+       
+  case MSG_OSD_RG_NOTIFY:
+       handle_rg_notify((MOSDRGNotify*)m);
+       break;
        
        // osd
   case MSG_SHUTDOWN:
@@ -209,10 +178,6 @@ void OSD::dispatch(Message *m)
        delete m;
        break;
 
-  case MSG_OSD_GETMAPACK:
-       handle_getmap_ack((MOSDGetMapAck*)m);
-       break;
-       
   case MSG_PING:
        // take note.
        monitor->host_is_alive(m->get_source());
@@ -270,6 +235,24 @@ void OSD::handle_ping(MPing *m)
 
 
 
+// =====================================================
+// MAP
+
+void OSD::wait_for_new_map(Message *m)
+{
+  // ask MDS
+  messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
+                                                 MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+
+  osd_lock.Lock();
+  waiting_for_osdmap.push_back(m);
+  osd_lock.Unlock();
+}
+
+
+/** update_map
+ * assimilate a new OSDMap.  scan rgs.
+ */
 void OSD::update_map(bufferlist& state)
 {
   // decode new map
@@ -277,11 +260,108 @@ void OSD::update_map(bufferlist& state)
   osdmap->decode(state);
   dout(7) << "update_map version " << osdmap->get_version() << endl;
 
+  // scan known replica groups!
+  scan_rg();
+}
+
+
+void OSD::handle_osd_map(MOSDMap *m)
+{
+  // SAB
+  osd_lock.Lock();
+
+  if (!osdmap ||
+         m->get_version() > osdmap->get_version()) {
+       if (osdmap) {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+       } else {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+       }
+
+       update_map(m->get_osdmap());
+       delete m;
+
+       // process waiters
+       list<Message*> waiting;
+       waiting.splice(waiting.begin(), waiting_for_osdmap);
+
+       osd_lock.Unlock();
+       
+       for (list<Message*>::iterator it = waiting.begin();
+                it != waiting.end();
+                it++) {
+         dispatch(*it);
+       }
+  } else {
+       dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+       osd_lock.Unlock();
+  }
+}
+
+
+
+// ======================================================
+// REPLICATION
+
+
+// ------------------------------------
+// replica groups
+
+void OSD::get_rg_list(list<repgroup_t>& ls)
+{
+  // just list collections; assume they're all rg's (for now)
+  store->list_collections(ls);
+}
+
+
+bool OSD::rg_exists(repgroup_t rg) 
+{
+  struct stat st;
+  if (store->collection_stat(rg, &st) == 0) 
+       return true;
+  else
+       return false;
+}
+
+
+RG *OSD::open_rg(repgroup_t rg)
+{
+  // already open?
+  if (rg_map.count(rg)) 
+       return rg_map[rg];
+
+  // stat collection
+  RG *r = new RG(rg);
+  if (rg_exists(rg)) {
+       // exists
+       r->fetch(store);
+  } else {
+       // dne
+       r->store(store);
+  }
+  rg_map[rg] = r;
+
+  return r;
+}
+
+
+
+
+/** 
+ * scan replica groups, initiate any replication
+ * activities.
+ */
+void OSD::scan_rg()
+{
+  //dout(7) << "scan_rg map version " << osdmap->get_version() << endl;
+
   // scan replica groups
   list<repgroup_t> ls;
   get_rg_list(ls);
   
-  map< int, list<RG*> > primary_ping_queue;
+  map< int, list<repgroup_t> > notify_list;
+  map< int, set<RG*> > start_set;
 
   for (list<repgroup_t>::iterator it = ls.begin();
           it != ls.end();
@@ -301,7 +381,7 @@ void OSD::update_map(bufferlist& state)
 
        if (role != rg->get_role()) {
          // role change.
-         dout(7) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; 
+         dout(10) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; 
          
          // am i old-primary?
          if (rg->get_role() == 0) {
@@ -309,7 +389,7 @@ void OSD::update_map(bufferlist& state)
                for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
                         it != rg->get_peers().end();
                         it++) {
-                 dout(7) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+                 dout(10) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
                  rg->get_old_replica_set().insert(it->first);
                  delete it->second;
                }
@@ -320,7 +400,14 @@ void OSD::update_map(bufferlist& state)
          rg->state_clear(RG_STATE_PEERED);
          rg->set_role(role);
          rg->store(store);
-         primary_ping_queue[primary].push_back(rg);
+
+         if (role == 0) {
+               // i am new primary
+               
+         } else {
+               // i am replica
+               notify_list[primary].push_back(rgid);
+         }
          
        } else {
          // no role change.
@@ -330,76 +417,153 @@ void OSD::update_map(bufferlist& state)
                
                // did primary change?
                if (primary != rg->get_primary()) {
-                 dout(7) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+                 dout(10) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
                  
                  // re-peer
                  rg->state_clear(RG_STATE_PEERED);
                  rg->set_primary(primary);
                  rg->store(store);
-                 primary_ping_queue[primary].push_back(rg);
+                 notify_list[primary].push_back(rgid);
                }
          }
-         else if (role == 0) {
-               // i am primary.
-
-               // check replicas
-               for (int r=1; r<nrep; r++) {
-                 if (rg->get_peer(r) == 0) {
-                       dout(7) << " rg " << rgid << " primary not peered with replica " << r << " osd" << acting[r] << endl;
-                       
-                       // ***
-                 } 
-               }
-
+       }
+       
+       if (role == 0) {
+         // i am primary.
+         
+         // old peers
+         // ***
+
+         // check replicas
+         for (int r=1; r<nrep; r++) {
+               if (rg->get_peer(r) == 0) {
+                 dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+                 start_set[acting[r]].insert(rg);
+               } 
          }
        }
   }
   
 
-  // initiate any new peering sessions!
-  for (map< int, list<RG*> >::iterator pit = primary_ping_queue.begin();
-          pit != primary_ping_queue.end();
-          pit++) {
-       // create peer message
-       int primary = pit->first;
-       
+  // notify?
+  for (map< int, list<repgroup_t> >::iterator pit = notify_list.begin();
+          pit != notify_list.end();
+          pit++)
+       peer_notify(pit->first, pit->second);
 
-       for (list<RG*>::iterator rit = pit->second.begin();
-                rit != pit->second.end();
-                rit++) {
-         // add this RG to peer message
-       }
+  // start peer?
+  for (map< int, set<RG*> >::iterator pit = start_set.begin();
+          pit != start_set.end();
+          pit++)
+       peer_start(pit->first, pit->second);
 
-       // send
-       
-  }
 
 }
 
 
-void OSD::handle_getmap_ack(MOSDGetMapAck *m)
+/** peer_notify
+ * Send an MOSDRGNotify to a primary, with a list of RGs that I have
+ * content for, and they are primary for.
+ */
+void OSD::peer_notify(int primary, list<repgroup_t>& rg_list)
 {
-  // SAB
-  osd_lock.Lock();
+  dout(7) << "peer_notify osd" << primary << " on " << rg_list.size() << " RGs" << endl;
+  MOSDRGNotify *m = new MOSDRGNotify(primary, rg_list);
+  messenger->send_message(m, 
+                                                 MSG_ADDR_OSD(primary));
+}
 
-  update_map(m->get_osdmap());
-  delete m;
 
-  // process waiters
-  list<MOSDOp*> waiting;
-  waiting.splice(waiting.begin(), waiting_for_osdmap);
+/** peer_start
+ * initiate a peer session with a replica on given list of RGs
+ */
+void OSD::peer_start(int replica, set<RG*>& rg_set)
+{
+  dout(7) << "peer_start with osd" << replica << " on " << rg_set.size() << " RGs" << endl;
+  
+  
+}
 
-  list<MOSDOp*> w = waiting;
 
-  osd_lock.Unlock();
 
-  for (list<MOSDOp*>::iterator it = w.begin();
-          it != w.end();
+
+
+void OSD::handle_rg_notify(MOSDRGNotify *m)
+{
+  int from = MSG_ADDR_NUM(m->get_source());
+  dout(7) << "handle_rg_notify from osd" << from << endl;
+
+  // older map?
+  if (m->get_version() < osdmap->get_version()) {
+       dout(7) << "  from old map version " << m->get_version() << " < " << osdmap->get_version() << endl;
+       delete m;   // discard and ignore.*
+       return;
+  }
+
+  // newer map?
+  if (m->get_version() > osdmap->get_version()) {
+       dout(7) << "  for newer map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+       wait_for_new_map(m);
+       return;
+  }
+  
+  assert(m->get_version() == osdmap->get_version());
+  
+  // look for unknown RGs i'm primary for
+  map< int, set<RG*> > start_set;
+
+  for (list<repgroup_t>::iterator it = m->get_rg_list().begin();
+          it != m->get_rg_list().end();
           it++) {
-       handle_op(*it);
+       repgroup_t rgid = *it;
+       
+       int acting[NUM_RUSH_REPLICAS];
+       int nrep = osdmap->repgroup_to_acting_osds(rgid, acting, NUM_RUSH_REPLICAS);
+       assert(acting[0] == whoami);
+       
+       // get/open RG
+       RG *rg = open_rg(rgid);
+
+       // previously unknown RG?
+       if (rg->get_peers().empty()) {
+         dout(10) << " rg " << rgid << " is new" << endl;
+         for (int r=1; r<nrep; r++) {
+               if (rg->get_peer(r) == 0) {
+                 dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+                 start_set[acting[r]].insert(rg);
+               } 
+         }
+       }
+
+       // peered with this guy specifically?
+       RGPeer *rgp = rg->get_peer(from);
+       if (!rgp) {
+         dout(7) << " not yet peered with osd" << from << " on rg " << rgid << endl;
+         start_set[from].insert(rg);
+       }
   }
+
+  // start peers?
+  if (start_set.empty()) {
+       dout(7) << " no new peers" << endl;
+  } else {
+       for (map< int, set<RG*> >::iterator pit = start_set.begin();
+                pit != start_set.end();
+                pit++)
+         peer_start(pit->first, pit->second);
+  }
+  
+  delete m;
 }
 
+
+
+
+
+// =========================================================
+// OPS
+
+
 void OSD::handle_op(MOSDOp *op)
 {
   // mkfs is special
@@ -411,7 +575,7 @@ void OSD::handle_op(MOSDOp *op)
   // no map?  starting up?
   if (!osdmap) {
     osd_lock.Lock();
-       dout(7) << "no OSDMap, starting up" << endl;
+       dout(7) << "no OSDMap, asking MDS" << endl;
        if (waiting_for_osdmap.empty()) 
          messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
                                                          MSG_ADDR_MDS(0), MDS_PORT_MAIN);
@@ -424,17 +588,7 @@ void OSD::handle_op(MOSDOp *op)
   if (op->get_map_version() > osdmap->get_version()) {
        // op's is newer
        dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
-       
-       // query MDS
-       dout(7) << "querying MDS" << endl;
-       messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
-                                                       MSG_ADDR_MDS(0), MDS_PORT_MAIN);
-
-       assert(0);
-
-       osd_lock.Lock();
-       waiting_for_osdmap.push_back(op);
-       osd_lock.Unlock();
+       wait_for_new_map(op);
        return;
   }
 
index 55bb12ba07ecb74c9b088f8b7477769376a9ecc7..a287f34529e219a02aacc654bcf66eae45297a32 100644 (file)
@@ -174,28 +174,45 @@ class OSD : public Dispatcher {
   Messenger *messenger;
   int whoami;
 
-  class OSDMap  *osdmap;
   class ObjectStore *store;
   class HostMonitor *monitor;
   class Logger      *logger;
+
+  // global lock
+  Mutex osd_lock;
+
+
+  // -- ops --
   class ThreadPool<class OSD, class MOSDOp>  *threadpool;
 
-  list<class MOSDOp*> waiting_for_osdmap;
+  void queue_op(class MOSDOp *m);
+ public:
+  void do_op(class MOSDOp *m);
+  static void doop(OSD *o, MOSDOp *op) {
+       o->do_op(op);
+  };
+
+ protected:
+
+  // -- osd map --
+  class OSDMap  *osdmap;
+  list<class Message*> waiting_for_osdmap;
+
+  void update_map(bufferlist& state);
+  void wait_for_new_map(Message *m);
+  void handle_osd_map(class MOSDMap *m);
 
-  // replica hack
+  
+  // <old replica hack>
   __uint64_t                     last_tid;
   Mutex                          replica_write_lock;
   map<MOSDOp*, Cond*>            replica_write_cond;
   map<MOSDOp*, set<__uint64_t> > replica_write_tids;
   map<__uint64_t, MOSDOp*>       replica_writes;
+  // </hack>
 
-  // global lock
-  Mutex osd_lock;
-
-
-  void update_map(bufferlist& state);
 
-  // rg's
+  // -- replication --
   hash_map<repgroup_t, RG*>      rg_map;
 
   void get_rg_list(list<repgroup_t>& ls);
@@ -204,6 +221,14 @@ class OSD : public Dispatcher {
   void close_rg(repgroup_t rg);          // close in-memory state
   void remove_rg(repgroup_t rg);         // remove state from store
 
+  void scan_rg();
+  void peer_notify(int primary, list<repgroup_t>& rg_list);
+  void peer_start(int replica, set<RG*>& rg_list);
+
+  void handle_rg_notify(class MOSDRGNotify *m);
+  void handle_rg_peer(class MOSDRGPeer *m);
+  void handle_rg_peer_ack(class MOSDRGPeerAck *m);
+
  public:
   OSD(int id, Messenger *m);
   ~OSD();
@@ -212,18 +237,10 @@ class OSD : public Dispatcher {
   int init();
   int shutdown();
 
-  // ops
-  void queue_op(class MOSDOp *m);
-  void do_op(class MOSDOp *m);
-  static void doop(OSD *o, MOSDOp *op) {
-      o->do_op(op);
-    };
-
   // messages
   virtual void dispatch(Message *m);
 
   void handle_ping(class MPing *m);
-  void handle_getmap_ack(class MOSDGetMapAck *m);
   void handle_op(class MOSDOp *m);
   void op_read(class MOSDOp *m);
   void op_write(class MOSDOp *m);
index 583822d1ce2c87398814191d3e0f39e48903a960..33b748838b93c58d0b751cc61d927d052dc63b43 100644 (file)
@@ -23,7 +23,7 @@ using namespace std;
 /*
  * some system constants
  */
-#define NUM_REPLICA_GROUPS   (1<<20)  // ~1M
+//#define NUM_REPLICA_GROUPS   (1<<20)  // ~1M
 #define NUM_RUSH_REPLICAS         4   // this should be big enough to cope w/ failing disks.
 #define MAX_REPLICAS              3
 
@@ -44,9 +44,10 @@ class OSDFileLayout {
   int stripe_size;     // stripe unit, in bytes
   int stripe_count;    // over this many objects
   int object_size;     // until objects are this big, then use a new set of objects.
+  int num_rep;
 
-  OSDFileLayout(int ss, int sc, int os) :
-       stripe_size(ss), stripe_count(sc), object_size(os) { }
+  OSDFileLayout(int ss, int sc, int os, int nr=2) :
+       stripe_size(ss), stripe_count(sc), object_size(os), num_rep(nr) { }
 };
 
 
@@ -109,6 +110,7 @@ class OSDMap {
 
   Mutex  osd_cluster_lock;
 
+ public:
   void init_rush() {
 
     // SAB
@@ -135,6 +137,7 @@ class OSDMap {
   }
 
   __uint64_t get_version() { return version; }
+  void inc_version() { version++; }
 
   // cluster state
   bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; }
@@ -173,12 +176,18 @@ class OSDMap {
 
   /* map (ino, blockno) into a replica group */
   repgroup_t file_to_repgroup(inodeno_t ino, 
-                                                         size_t ono) {
+                                                         size_t ono,
+                                                         int nrep) {
        // something simple for now
        // hash this eventually!
-       return (ino+ono) % NUM_REPLICA_GROUPS;
+       return ((ino+ono) % g_conf.osd_num_rg) +
+         (nrep * g_conf.osd_num_rg);
   }
 
+  /* get nrep from rgid */
+  int repgroup_to_nrep(repgroup_t rg) {
+       return rg / g_conf.osd_num_rg;
+  }
 
   /* map (repgroup) to a list of osds.  
         this is where we invoke RUSH. */
@@ -310,7 +319,7 @@ class OSDMap {
          else {
                ex = &object_extents[oid];
                ex->oid = oid;
-               ex->rg = file_to_repgroup( ino, objectno );
+               ex->rg = file_to_repgroup( ino, objectno, layout.num_rep );
                ex->osd = get_rg_acting_primary( ex->rg );
          }
 
index f52f4a72c7c58d5ff683ae0cf3416710eafcfa85..f4c8bcc3361a46840a33fa8e1db539185da7857d 100644 (file)
@@ -10,6 +10,7 @@
 //#include "messages/MOSDWriteReply.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDMap.h"
 
 #include "msg/Messenger.h"
 
@@ -48,6 +49,30 @@ void Filer::dispatch(Message *m)
 }
 
 
+
+void Filer::handle_osd_map(MOSDMap *m)
+{
+  if (!osdmap ||
+         m->get_version() > osdmap->get_version()) {
+       if (osdmap) {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+       } else {
+         dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+       }
+       
+       osdmap->decode(m->get_osdmap());
+       
+       // kick requests who might be timing out on the wrong osds
+       // ** FIXME **
+
+  } else {
+       dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+  }
+}
+
+
+
+
 /*
 void Filer::queue_outgoing(Message *m, int osd)
 {
index cf2a5a706371cc29fdf93cd566c1dc7da2d39f10..37c40047493726c1bd86bd3717136f7d0b2e94d9 100644 (file)
@@ -128,6 +128,8 @@ class Filer : public Dispatcher {
   void handle_osd_read_reply(class MOSDOpReply *m);
   void handle_osd_write_reply(class MOSDOpReply *m);
   void handle_osd_op_reply(class MOSDOpReply *m);
+
+  void handle_osd_map(class MOSDMap *m);
   
 };