]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 15 Jun 2005 19:58:26 +0000 (19:58 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 15 Jun 2005 19:58:26 +0000 (19:58 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@321 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
ceph/Makefile
ceph/TODO
ceph/config.cc
ceph/mds/CInode.cc
ceph/mds/CInode.h
ceph/mds/MDCache.cc
ceph/mds/MDS.cc
ceph/mds/MDS.h
ceph/messages/MOSDOp.h
ceph/msg/MPIMessenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osd/rush.cc
ceph/test/testmpi.cc [new file with mode: 0644]

index 7d09041c071bacb6d98c738c31303b453894fc2a..e0c73071b4cc5a3ba4989e3ec0fea6f04aa7e644 100644 (file)
@@ -86,6 +86,10 @@ fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OS
 fakefuse: fakefuse.cc mds/allmds.o client/Client.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
        ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
+testmpi: test/testmpi.cc msg/MPIMessenger.cc ${COMMON_OBJS}
+       ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@
+
+
 clean:
        rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
 
index 761bb731d886f328d67562512baddfe8b593eb47..3dc47c5bf37797493ac87d1d145d6035c7038107 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -26,10 +26,6 @@ md tests:
 - log length versus cache size, workload
 
 
-notes and todos.
-- SyntheticClient
-
-- virtual mega-filesystem (metadata only)
 
 
 finish HARD LINKS
index c54c588eb9b1f7ea8510743bcbfacf4125b96f69..c8e69441d48201fb8c7a9248c321a7f274c7ca1f 100644 (file)
@@ -49,7 +49,7 @@ md_config_t g_conf = {
 
   mds_bal_replicate_threshold: 500,
   mds_bal_unreplicate_threshold: 200,
-  mds_bal_interval: 10000,
+  mds_bal_interval: 60,           // seconds
 
   mds_commit_on_shutdown: true,
 
index 6d4ac739fbaed20ff20a807b5cc4786c2b95ed46..58820fa8608aa92bf4616776019ebdb34eda4b8e 100644 (file)
@@ -13,7 +13,7 @@
 #define dout(x)  if (x <= g_conf.debug) cout << "cinode: "
 
 
-map<int, int> cinode_pins;  // counts
+int cinode_pins[CINODE_NUM_PINS];  // counts
 
 
 ostream& operator<<(ostream& out, CInode& in)
index ff873723a8e82c30ea6a5709baa187fea871b1f3..fb0769f966335594eed332d9d05d14d4a04c598e 100644 (file)
@@ -157,7 +157,7 @@ class CInode;
 ostream& operator<<(ostream& out, CInode& in);
 
 
-extern map<int, int> cinode_pins;  // counts
+extern int cinode_pins[CINODE_NUM_PINS];  // counts
 
 
 // cached inode wrapper
index 1a226c0b7e0b3258aed138a618270e313514853a..8868e7e1a8509aec11f48728e2a6cba524b9a48e 100644 (file)
@@ -1499,14 +1499,9 @@ void MDCache::request_cleanup(Message *req)
 
   if (g_conf.log_pins) {
        // pin
-       for (map<int,int>::iterator it = cinode_pins.begin();
-                it != cinode_pins.end();
-                it++) {
-         //string s = "I";
-         //s += cinode_pin_names[it->first];
-         mds->logger2->set(//s, 
-                                               cinode_pin_names[it->first],
-                                               it->second);
+       for (int i=0; i<CINODE_NUM_PINS; i++) {
+         mds->logger2->set(cinode_pin_names[i],
+                                               cinode_pins[i]);
        }
        /*
          for (map<int,int>::iterator it = cdir_pins.begin();
index 10e9fbf343af72a3c9cc2ac7a6bbc01d39e8fe67..07d6bbd5f0a5c39b8d64ab933e510172f390936e 100644 (file)
@@ -112,7 +112,7 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
   mds_paused = false;
 
   stat_ops = 0;
-  last_heartbeat = 0;
+  last_balancer_heartbeat = g_clock.gettimepair();
 
   // log
   string name;
@@ -441,9 +441,10 @@ void MDS::my_dispatch(Message *m)
   }
 
   // balance?
+  timepair_t now = g_clock.gettimepair();
   if (true && whoami == 0 &&
-         stat_ops >= last_heartbeat + g_conf.mds_bal_interval) {
-       last_heartbeat = stat_ops;
+         now.first - last_balancer_heartbeat.first >= g_conf.mds_bal_interval) {
+       last_balancer_heartbeat = now;
        balancer->send_heartbeat();
   }
 
index d98776b643a1a0d980df1e80e82bcf3f8be60c1d..a927624691be63c07989e07c57b3e54aede30beb 100644 (file)
@@ -136,7 +136,7 @@ class MDS : public Dispatcher {
 
  protected:
   __uint64_t   stat_ops;
-  __uint64_t   last_heartbeat;
+  timepair_t   last_balancer_heartbeat;
   
  public:
   MDS(MDCluster *mdc, int whoami, Messenger *m);
index 5a351eff9e400d4c366951698b593a8c512833e6..c2e5551f6792514394750d6a88e1bb5753155b2a 100644 (file)
@@ -27,7 +27,7 @@ typedef struct {
   msg_addr_t asker;
 
   object_t oid;
-  repgroup_t rg;
+  repgroup_t rg, rg_role;
   __uint64_t ocv;
 
   int op;
@@ -48,6 +48,7 @@ class MOSDOp : public Message {
 
   object_t get_oid() { return st.oid; }
   repgroup_t get_rg() { return st.rg; }
+  int        get_rg_role() { return st.rg_role; }  // who am i asking for?
   __uint64_t get_ocv() { return st.ocv; }
 
   int get_op() { return st.op; }
@@ -76,11 +77,14 @@ class MOSDOp : public Message {
 
        this->st.oid = oid;
        this->st.rg = rg;
+       this->st.rg_role = 0;
        this->st.ocv = ocv;
        this->st.op = op;
   }
   MOSDOp() {}
 
+  void set_rg_role(int r) { st.rg_role = r; }
+  
   void set_length(size_t l) { st.length = l; }
   void set_offset(size_t o) { st.offset = o; }
 
index e755c56f6aa464a25bb80572d23c86ce13d86d46..d49255a1886f13d1a3df37977dcdd642f2e3e72c 100644 (file)
@@ -35,7 +35,7 @@ int mpi_rank;
 bool mpi_done = false;     // set this flag to stop the event loop
 
 
-#define FUNNEL_MPI        // if we want to funnel mpi through a single thread
+#define FUNNEL_MPI         // if we want to funnel mpi through a single thread
 #define TAG_UNSOLICITED 0
 #define DBLVL 18
 
index 82b9fcc7d3ca0c8d42ab9dcf4b2893dc41ef4cc2..9c03f83a07084e50c8ae45936fa96ed378c1bab0 100644 (file)
@@ -217,7 +217,15 @@ void OSD::handle_op(MOSDOp *op)
   if (op->get_ocv() < osdcluster->get_version()) {
        // op's is old
        dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+  }
+
+
 
+  // am i the right rg_role?
+  repgroup_t rg = op->get_rg();
+  if (op->get_rg_role() == 0) {
+       // PRIMARY
+       
        // verify that we are primary, or acting primary
        int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
        if (acting_primary != whoami) {
@@ -226,9 +234,20 @@ void OSD::handle_op(MOSDOp *op)
          logger->inc("fwd");
          return;
        }
+  } else {
+       // REPLICA
+       int my_role = osdcluster->get_rg_role(rg, whoami);
+       
+       dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+
+       if (my_role != op->get_rg_role()) {
+         assert(0); 
+       }
   }
 
   
+
+  
   // do the op
   switch (op->get_op()) {
 
index 25739a3ce1566472bac2dc54a0c252058632c5c8..35fbc52e1b042080583e192058c369142a2c1b9b 100644 (file)
@@ -24,7 +24,7 @@ class Message;
 class ReplicaGroup {
  public:
   repgroup_t rg;
-  int        role;    // 1 = primary, 2 = secondary, etc.  0=undef.
+  int        role;    // 0 = primary, 1 = secondary, etc.  0=undef.
   int        state;   
 
   map<object_t, int>  dirty_map;  // dirty objects
@@ -34,6 +34,8 @@ class ReplicaGroup {
   void enumerate_objects(list<object_t>& ls);
 };
 
+
+
 class OSD : public Dispatcher {
  protected:
   Messenger *messenger;
index 8ffac78ef4406bedb3ac7afffbbf8f7ff8ce173e..a951f6a2fccf6d46c871f501fca3a2653ba97e22 100644 (file)
@@ -183,6 +183,19 @@ class OSDCluster {
        return -1;  // we fail!
   }
 
+  /* what replica # is a given osd? 0 primary, -1 for none. */
+  int get_rg_role(repgroup_t rg, int osd) {
+       int group[NUM_RUSH_REPLICAS];
+       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+       int role = 0;
+       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+         if (failed_osds.count(group[i])) continue;
+         if (group[i] == osd) return role;
+         role++;
+       }
+       return -1;  // none
+  }
+
   /* map (ino, offset, len) to a (list of) OSDExtents 
         (byte ranges in objects on osds) */
   void file_to_extents(inodeno_t ino,
index 5c6eade301f2c33923d6327d68b787a227613108..574dbae90eb83e01dc4da50b3198f80f2ba1fdbd 100644 (file)
@@ -82,6 +82,9 @@ Rush::GetServersByKey (int key, int nReplicas, int servers[])
   double       myWeight;
   RushRNG      rng;
 
+  // There may not be more replicas than servers!
+  assert (nReplicas <= totalServers);
+  
   for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
     if (serversInPrevious[cluster] < replicasLeft) {
       mustAssign = replicasLeft - serversInPrevious[cluster];
@@ -96,13 +99,13 @@ Rush::GetServersByKey (int key, int nReplicas, int servers[])
     rng.Seed (myhash (key)^cluster, cluster^0xb90738);
     numberAssigned = mustAssign +
       rng.HyperGeometricWeighted (toDraw, myWeight,
-                                 totalWeightBefore[cluster] + myWeight,
-                                 clusterWeight[cluster]);
+                                                                 totalWeightBefore[cluster] + myWeight,
+                                                                 clusterWeight[cluster]);
     if (numberAssigned > 0) {
       rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
       rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
       for (i = 0; i < numberAssigned; i++) {
-       srv[i] += serversInPrevious[cluster];
+               srv[i] += serversInPrevious[cluster];
       }
       replicasLeft -= numberAssigned;
       srv += numberAssigned;
diff --git a/ceph/test/testmpi.cc b/ceph/test/testmpi.cc
new file mode 100644 (file)
index 0000000..07d1f09
--- /dev/null
@@ -0,0 +1,44 @@
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+#include "messages/MPing.h"
+
+#include "msg/MPIMessenger.h"
+
+class Pinger : public Dispatcher {
+public:
+  Messenger *messenger;
+  Pinger(Messenger *m) : messenger(m) {
+       m->set_dispatcher(this);
+  }
+  void dispatch(Message *m) {
+       cout << "got incoming " << m << endl;
+       delete m;
+  }
+};
+
+int main(int argc, char **argv) {
+
+
+  int myrank = mpimessenger_init(argc, argv);
+  int world = mpimessenger_world();
+  
+  Pinger *p = new Pinger( new MPIMessenger(myrank) );
+
+  mpimessenger_start();
+
+  while (1) {
+       
+       // ping random nodes
+       int d = rand() % world;
+       p->messenger->send_message(new MPing(), d);
+
+  }
+
+  
+  mpimessenger_wait();
+  mpimessenger_shutdown();  // shutdown MPI
+}