]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
monitor switchover;
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 25 Aug 2006 02:43:26 +0000 (02:43 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 25 Aug 2006 02:43:26 +0000 (02:43 +0000)
makefile changes;
ebofs no-write aging (untested)

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@815 29311d96-e01e-0410-9327-a35deaab8ce9

21 files changed:
ceph/Makefile
ceph/config.cc
ceph/config.h
ceph/ebofs/BlockDevice.cc
ceph/ebofs/Ebofs.cc
ceph/ebofs/Ebofs.h
ceph/fakesyn.cc
ceph/mds/OSDMonitor.cc
ceph/mds/OSDMonitor.h
ceph/messages/MOSDPing.h
ceph/mon/Monitor.cc
ceph/mon/Monitor.h
ceph/msg/FakeMessenger.cc
ceph/msg/Message.cc
ceph/msg/Message.h
ceph/newsyn.cc
ceph/osd/Ager.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/ObjectStore.h
ceph/tcpsyn.cc

index 8d0c0ca9341b6e41b598b94c0eed1bca763a3ac1..415bc88cfd0e94a9d09a2b8507e1b135b64c5b53 100644 (file)
@@ -39,14 +39,12 @@ MDS_OBJS= \
        mds/MDStore.o\
        mds/LogStream.o\
        mds/IdAllocator.o\
-       mds/MDLog.o\
-       mds/OSDMonitor.o 
+       mds/MDLog.o
 
 OSD_OBJS= \
        osd/PG.o\
        osd/Ager.o\
        osd/FakeStore.o\
-       ebofs.o\
        osd/OSD.o
 
 OSDC_OBJS= \
@@ -54,6 +52,10 @@ OSDC_OBJS= \
        osdc/ObjectCacher.o\
        osdc/Objecter.o
 
+MON_OBJS= \
+       mon/Monitor.o\
+       mon/Elector.o
+
 COMMON_OBJS= \
        msg/Messenger.o\
        msg/Message.o\
@@ -62,8 +64,6 @@ COMMON_OBJS= \
        common/Logger.o\
        common/Clock.o\
        common/Timer.o\
-       mon/Monitor.o\
-       mon/Elector.o\
        config.o
 
 
@@ -89,7 +89,10 @@ obfs: depend obfstest
 
 
 # real bits
-cosd: cosd.cc osd.o msg/NewMessenger.o common.o
+cmon: cmon.cc mon.o ebofs.o msg/NewMessenger.o common.o
+       ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+
+cosd: cosd.cc osd.o ebofs.o msg/NewMessenger.o common.o
        ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
 
 cmds: cmds.cc mds.o osdc.o msg/NewMessenger.o common.o
@@ -106,7 +109,7 @@ gprof-helper.so: test/gprof-helper.c
 
 
 # fuse
-fakefuse: fakefuse.cc mds.o client.o osd.o client/fuse.o msg/FakeMessenger.cc common.o
+fakefuse: fakefuse.cc mds.o client.o osd.o ebofs.o client/fuse.o msg/FakeMessenger.cc common.o
        ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 tcpfuse: tcpfuse.cc mds.o client.o client/fuse.o ${TCP_OBJS} common.o
@@ -117,19 +120,16 @@ mpifuse: mpifuse.cc mds.o client.o client/fuse.o ${TCP_OBJS} common.o
 
 
 # synthetic workload
-fakemon: fakemon.cc mds.o client.o osd.o osdc.o msg/FakeMessenger.o common.o
+fakemon: fakemon.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
-fakesyn: fakesyn.cc mds.o client.o osd.o osdc.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
-mpisyn: mpisyn.cc mds.o client.o osd.o osdc.o msg/MPIMessenger.cc common.o
+tcpsyn: tcpsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o ${TCP_OBJS} common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-tcpsyn: tcpsyn.cc mds.o client.o osd.o osdc.o ${TCP_OBJS} common.o
-       ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
-
-newsyn: newsyn.cc mds.o client.o osd.o osdc.o msg/NewMessenger.o common.o
+newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewMessenger.o common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
 # + obfs
@@ -199,6 +199,9 @@ osd_obfs.o: osd/OBFSStore.o osd/OSD.ccosd/PG.o osd/ObjectStore.o osd/FakeStore.o
 mds.o: ${MDS_OBJS}
        ld -i -o $@ $^
 
+mon.o: ${MON_OBJS}
+       ld -i -o $@ $^
+
 %.o: %.cc
        ${CC} ${CFLAGS} -c $< -o $@
 
index a04a3f19b47f87744a454b7ebccce2075b6a5db6..3b748d71ebe61021aaa58adc00fed71cff481b01 100644 (file)
@@ -41,7 +41,7 @@ map<int,float> g_fake_osd_out;
 md_config_t g_debug_after_conf;
 
 md_config_t g_conf = {
-  num_mon: 5,
+  num_mon: 1,
   num_mds: 1,
   num_osd: 4,
   num_client: 1,
@@ -58,7 +58,9 @@ md_config_t g_conf = {
 
   fake_clock: false,
   fakemessenger_serialize: true,
+
   fake_osdmap_expand: 0,
+  fake_osdmap_updates: 0,
 
   osd_remount_at: 0,
 
@@ -84,17 +86,22 @@ md_config_t g_conf = {
   
   debug_after: 0,
 
-  tcp_skip_rank0: false,
+  /*tcp_skip_rank0: false,
   tcp_overlay_clients: false,  // over osds!
   tcp_log: false,
   tcp_serial_marshall: true,
   tcp_serial_out: false,
   tcp_multi_out: true,
   tcp_multi_dispatch: false,  // not fully implemented yet
+  */
 
   ms_single_dispatch: false,
   ms_requeue_on_sender_fail: false,
 
+  ms_stripe_osds: false,
+  ms_skip_rank0: false,
+  ms_overlay_clients: false,
+
   // --- mon ---
   mon_tick_interval: 5,
   mon_osd_down_out_interval: 10,  // seconds
@@ -219,6 +226,7 @@ md_config_t g_conf = {
   uofs_delay_allocation:       (int)1,         //true
 
   // --- block device ---
+  bdev_lock: true,
   bdev_iothreads:    1,         // number of ios to queue with kernel
   bdev_idle_kick_after_ms: 0,//100, // ms   ** FIXME ** this seems to break things, not sure why yet **
   bdev_el_fw_max_ms: 1000,      // restart elevator at least once every 1000 ms
@@ -326,23 +334,28 @@ void parse_config_options(vector<char*>& args)
        else if (strcmp(args[i], "--numosd") == 0) 
          g_conf.num_osd = atoi(args[++i]);
 
-       else if (strcmp(args[i], "--tcp_skip_rank0") == 0)
-         g_conf.tcp_skip_rank0 = true;
-       else if (strcmp(args[i], "--tcp_overlay_clients") == 0)
-         g_conf.tcp_overlay_clients = true;
-       else if (strcmp(args[i], "--tcp_log") == 0)
+       else if (strcmp(args[i], "--ms_single_dispatch") == 0) 
+         g_conf.ms_single_dispatch = atoi(args[++i]);
+       else if (strcmp(args[i], "--ms_stripe_osds") == 0)
+         g_conf.ms_stripe_osds = true;
+       else if (strcmp(args[i], "--ms_skip_rank0") == 0)
+         g_conf.ms_skip_rank0 = true;
+       else if (strcmp(args[i], "--ms_overlay_clients") == 0)
+         g_conf.ms_overlay_clients = true;
+
+       /*else if (strcmp(args[i], "--tcp_log") == 0)
          g_conf.tcp_log = true;
        else if (strcmp(args[i], "--tcp_multi_out") == 0)
          g_conf.tcp_multi_out = atoi(args[++i]);
-
-       else if (strcmp(args[i], "--ms_single_dispatch") == 0) 
-         g_conf.ms_single_dispatch = atoi(args[++i]);
+       */
 
        else if (strcmp(args[i], "--mkfs") == 0) 
          g_conf.osd_mkfs = g_conf.mkfs = 1; //atoi(args[++i]);
 
        else if (strcmp(args[i], "--fake_osdmap_expand") == 0) 
          g_conf.fake_osdmap_expand = atoi(args[++i]);
+       else if (strcmp(args[i], "--fake_osdmap_updates") == 0) 
+         g_conf.fake_osdmap_updates = atoi(args[++i]);
        else if (strcmp(args[i], "--fake_osd_down") == 0) {
          int osd = atoi(args[++i]);
          float when = atof(args[++i]);
@@ -577,6 +590,8 @@ void parse_config_options(vector<char*>& args)
          g_conf.osd_maxthreads = atoi(args[++i]);
 
 
+       else if (strcmp(args[i], "--bdev_lock") == 0) 
+         g_conf.bdev_lock = atoi(args[++i]);
        else if (strcmp(args[i], "--bdev_el_bidir") == 0) 
          g_conf.bdev_el_bidir = atoi(args[++i]);
        else if (strcmp(args[i], "--bdev_iothreads") == 0) 
index 397fe84781b7689d612742364105b2d6fccd0126..5b596404b0da0ea8a47d66aaf1276e3768bf0b03 100644 (file)
@@ -33,6 +33,7 @@ struct md_config_t {
   bool fakemessenger_serialize;
 
   int fake_osdmap_expand;
+  int fake_osdmap_updates;
 
   int osd_remount_at;
 
@@ -58,17 +59,22 @@ struct md_config_t {
 
   int debug_after;
 
-  bool tcp_skip_rank0;
+  /*bool tcp_skip_rank0;
   bool tcp_overlay_clients;
   bool tcp_log;
   bool tcp_serial_marshall;
   bool tcp_serial_out;
   bool tcp_multi_out;
   bool tcp_multi_dispatch;
+  */
 
   bool ms_single_dispatch;
   bool ms_requeue_on_sender_fail;
 
+  bool ms_stripe_osds;
+  bool ms_skip_rank0;
+  bool ms_overlay_clients;
+
   // mon
   int mon_tick_interval;
   int mon_osd_down_out_interval;
@@ -191,6 +197,7 @@ struct md_config_t {
   int     uofs_delay_allocation;
 
   // block device
+  bool  bdev_lock;
   int   bdev_iothreads;
   int   bdev_idle_kick_after_ms;
   int   bdev_el_fw_max_ms;  
index 7c605b8edf2f8c7cf5b07841bb8f7b5508413bd0..af3ff1b21a0ed05ddd59a6cea3f3de2ec216d945 100644 (file)
@@ -577,11 +577,13 @@ int BlockDevice::open(kicker *idle)
   }
 
   // lock
-  int r = ::flock(fd, LOCK_EX);
-  if (r < 0) {
-       dout(1) << "open " << dev << " failed to get LOCK_EX" << endl;
-       assert(0);
-       return -1;
+  if (g_conf.bdev_lock) {
+       int r = ::flock(fd, LOCK_EX);
+       if (r < 0) {
+         dout(1) << "open " << dev << " failed to get LOCK_EX" << endl;
+         assert(0);
+         return -1;
+       }
   }
                           
   // figure size
@@ -634,7 +636,9 @@ int BlockDevice::close()
 
   dout(2) << "close " << endl;
 
-  ::flock(fd, LOCK_UN);
+  if (g_conf.bdev_lock)
+       ::flock(fd, LOCK_UN);
+
   ::close(fd);
   fd = 0;
 
index af9346648f4aa07c0ba45874be179675c55b17f0..5c634fb5cd360be523761ee1267ae5fe5fb61a69 100644 (file)
@@ -1343,6 +1343,11 @@ void Ebofs::apply_write(Onode *on, off_t off, size_t len, bufferlist& bl)
   alloc_write(on, bstart, blen, alloc, old_bfirst, old_blast);
   dout(20) << "apply_write  old_bfirst " << old_bfirst << ", old_blast " << old_blast << endl;
 
+  if (fake_writes) {
+       on->uncommitted.clear();   // worst case!
+       return;
+  }    
+
   // map b range onto buffer_heads
   map<block_t, BufferHead*> hits;
   oc->map_write(on, bstart, blen, alloc, hits);
index b2c15f9d50e1b39985205add26f2c8fa61ece976..01784f77b6acd6916143f5b2fa0403b5bf0cd0dd 100644 (file)
@@ -45,6 +45,9 @@ class Ebofs : public ObjectStore {
  protected:
   Mutex        ebofs_lock;    // a beautiful global lock
 
+  // ** debuggy **
+  bool         fake_writes;
+
   // ** super **
   BlockDevice  dev;
   bool         mounted, unmounting, dirty;
@@ -203,6 +206,7 @@ class Ebofs : public ObjectStore {
 
  public:
   Ebofs(char *devfn) : 
+       fake_writes(false),
        dev(devfn), 
        mounted(false), unmounting(false), dirty(false), readonly(false), 
        super_epoch(0), commit_thread_started(false), mid_commit(false),
@@ -268,6 +272,7 @@ class Ebofs : public ObjectStore {
   int collection_rmattr(coll_t cid, const char *name, Context *onsafe);
   int collection_listattr(object_t oid, vector<string>& attrs);
   
+  void _fake_writes(bool b) { fake_writes = b; }
 
 private:
   // private interface -- use if caller already holds lock
index 652617fa8ccc024321ec56bc84e004f5be7dab76..a647cc340397f202adef48da436f0ac32fd92505 100644 (file)
@@ -8,10 +8,12 @@ using namespace std;
 #include "config.h"
 
 #include "mds/MDCluster.h"
+
 #include "mds/MDS.h"
 #include "osd/OSD.h"
-#include "mds/OSDMonitor.h"
+#include "mon/Monitor.h"
 #include "client/Client.h"
+
 #include "client/SyntheticClient.h"
 
 #include "msg/FakeMessenger.h"
@@ -63,8 +65,10 @@ int main(int argc, char **argv)
   //int pid = getpid();
 
   // create mon
-  OSDMonitor *mon = new OSDMonitor(0, new FakeMessenger(MSG_ADDR_MON(0)));
-  mon->init();
+  Monitor *mon[g_conf.num_mon];
+  for (int i=0; i<g_conf.num_mon; i++) {
+       mon[i] = new Monitor(i, new FakeMessenger(MSG_ADDR_MON(i)));
+  }
 
   // create mds
   MDS *mds[NUMMDS];
@@ -99,6 +103,9 @@ int main(int argc, char **argv)
   fakemessenger_startthread();
   
   // init
+  for (int i=0; i<g_conf.num_mon; i++) {
+       mon[i]->init();
+  }
   for (int i=0; i<NUMMDS; i++) {
        mds[i]->init();
        if (g_conf.mds_local_osd)
index 5ec92372921b31b4fa7171c48426414683714ad4..0cbeba12b5df897be9c4608c8e68c1367ed4a0e2 100644 (file)
@@ -26,6 +26,8 @@
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
 #include "messages/MOSDBoot.h"
+#include "messages/MOSDIn.h"
+#include "messages/MOSDOut.h"
 
 #include "common/Timer.h"
 #include "common/Clock.h"
@@ -69,32 +71,33 @@ public:
 
 
 
+void OSDMonitor::fake_osdmap_update()
+{
+  dout(1) << "fake_osdmap_update" << endl;
+  accept_pending();
+
+  // tell a random osd
+  send_incremental_map(osdmap->get_epoch()-1,                    // ick! FIXME
+                                          MSG_ADDR_OSD(rand() % g_conf.num_osd));
+}
+
 
 void OSDMonitor::fake_reorg() 
 {
+  int r = rand() % g_conf.num_osd;
   
-  // HACK osd map change
-  static int d = 0;
-
-  if (d > 0) {
-       dout(1) << "changing OSD map, marking osd" << d-1 << " out" << endl;
-       osdmap->mark_out(d-1);
+  if (osdmap->is_out(r)) {
+       dout(1) << "fake_reorg marking osd" << r << " in" << endl;
+       pending.new_in.push_back(r);
+  } else {
+       dout(1) << "fake_reorg marking osd" << r << " out" << endl;
+       pending.new_out.push_back(r);
   }
 
-  dout(1) << "changing OSD map, marking osd" << d << " down" << endl;
-  osdmap->mark_down(d);
-
-  osdmap->inc_epoch();
-  d++;
+  accept_pending();
   
-  // bcast
-  bcast_latest_osd_map_osd();
-    
-  // do it again?
-  if (g_conf.num_osd - d > 4 &&
-         g_conf.num_osd - d > g_conf.num_osd/2)
-       g_timer.add_event_after(g_conf.fake_osdmap_expand,
-                                                       new C_OM_Faker(this));
+  // tell him!
+  send_incremental_map(osdmap->get_epoch()-1, MSG_ADDR_OSD(r));
 }
 
 
@@ -189,6 +192,13 @@ void OSDMonitor::dispatch(Message *m)
        handle_osd_boot((MOSDBoot*)m);
        return;
 
+  case MSG_OSD_IN:
+       handle_osd_in((MOSDIn*)m);
+       break;
+  case MSG_OSD_OUT:
+       handle_osd_out((MOSDOut*)m);
+       break;
+
   case MSG_SHUTDOWN:
        handle_shutdown(m);
        return;
@@ -318,6 +328,29 @@ void OSDMonitor::handle_osd_boot(MOSDBoot *m)
   bcast_latest_osd_map_mds();
 }
 
+void OSDMonitor::handle_osd_in(MOSDIn *m)
+{
+  dout(7) << "osd_in from " << m->get_source() << endl;
+  int from = m->get_source().num();
+  if (osdmap->is_out(from)) {
+       pending.new_in.push_back(from);
+       accept_pending();
+       send_incremental_map(m->map_epoch, m->get_source());
+  }
+}
+
+void OSDMonitor::handle_osd_out(MOSDOut *m)
+{
+  dout(7) << "osd_out from " << m->get_source() << endl;
+  int from = m->get_source().num();
+  if (osdmap->is_in(from)) {
+       pending.new_out.push_back(from);
+       accept_pending();
+       send_incremental_map(m->map_epoch, m->get_source());
+  }
+}
+
+
 void OSDMonitor::handle_osd_getmap(MOSDGetMap *m)
 {
   dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl;
index e129473dbbb8075f1c21b989a2595d565ddb5089..bb80210d5736a853eb4be743ea5fb09a71256952 100644 (file)
@@ -68,6 +68,8 @@ class OSDMonitor : public Dispatcher {
   void handle_shutdown(Message *m);
 
   void handle_osd_boot(class MOSDBoot *m);
+  void handle_osd_in(class MOSDIn *m);
+  void handle_osd_out(class MOSDOut *m);
   void handle_osd_failure(class MOSDFailure *m);
   void handle_osd_getmap(class MOSDGetMap *m);
 
@@ -75,6 +77,7 @@ class OSDMonitor : public Dispatcher {
 
   // hack
   void fake_osd_failure(int osd, bool down);
+  void fake_osdmap_update();
   void fake_reorg();
 
 };
index 14bef3052182f99cdd0ea857e3d24def7216fb97..a4205bfb3ea8fb18d7288de89b2c37bb207b05a1 100644 (file)
@@ -22,8 +22,9 @@
 class MOSDPing : public Message {
  public:
   epoch_t map_epoch;
+  bool ack;
 
-  MOSDPing(epoch_t e) : Message(MSG_OSD_PING), map_epoch(e) {
+  MOSDPing(epoch_t e, bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a) {
   }
   MOSDPing() {}
 
@@ -31,9 +32,12 @@ class MOSDPing : public Message {
        int off = 0;
        payload.copy(off, sizeof(map_epoch), (char*)&map_epoch);
        off += sizeof(map_epoch);
+       payload.copy(off, sizeof(ack), (char*)&ack);
+       off += sizeof(ack);
   }
   virtual void encode_payload() {
        payload.append((char*)&map_epoch, sizeof(map_epoch));
+       payload.append((char*)&ack, sizeof(ack));
   }
 
   virtual char *get_type_name() { return "oping"; }
index be498118dbcf339ba2c0ec3a5445cbe3a1a7ed99..bd4947009d5e0ec4a229d8f975b3c72f82e0abf3 100644 (file)
 
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
-#include "messages/MFailure.h"
-#include "messages/MFailureAck.h"
+#include "messages/MOSDFailure.h"
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
 #include "messages/MOSDBoot.h"
+#include "messages/MOSDIn.h"
+#include "messages/MOSDOut.h"
 
 #include "common/Timer.h"
 #include "common/Clock.h"
@@ -58,26 +59,34 @@ public:
 };
 
 
+void Monitor::fake_osdmap_update()
+{
+  dout(1) << "fake_osdmap_update" << endl;
+  accept_pending();
+
+  // tell a random osd
+  send_incremental_map(osdmap->get_epoch()-1,                    // ick! FIXME
+                                          MSG_ADDR_OSD(rand() % g_conf.num_osd));
+}
+
+
 void Monitor::fake_reorg() 
 {
+  int r = rand() % g_conf.num_osd;
   
-  // HACK osd map change
-  static int d = 0;
-
-  if (d > 0) {
-       dout(1) << "changing OSD map, marking osd" << d-1 << " out" << endl;
-       osdmap->mark_out(d-1);
+  if (osdmap->is_out(r)) {
+       dout(1) << "fake_reorg marking osd" << r << " in" << endl;
+       pending.new_in.push_back(r);
+  } else {
+       dout(1) << "fake_reorg marking osd" << r << " out" << endl;
+       pending.new_out.push_back(r);
   }
 
-  dout(1) << "changing OSD map, marking osd" << d << " down" << endl;
-  osdmap->mark_down(d);
-
-  osdmap->inc_epoch();
-  d++;
+  accept_pending();
+  
+  // tell him!
+  send_incremental_map(osdmap->get_epoch()-1, MSG_ADDR_OSD(r));
   
-  // bcast
-  bcast_latest_osd_map_osd();
-    
   // do it again?
   /*
   if (g_conf.num_osd - d > 4 &&
@@ -123,8 +132,8 @@ void Monitor::init()
          osdmap->osds.insert(i+10000);
   }
 
-  osdmap->encode(maps[osdmap->get_epoch()]); // 1
-  pending.epoch = osdmap->get_epoch()+1;     // 2
+  //osdmap->encode(maps[osdmap->get_epoch()]); // 1
+  //pending.epoch = osdmap->get_epoch()+1;     // 2
   // </HACK>
 
 
@@ -160,7 +169,7 @@ void Monitor::dispatch(Message *m)
   {
        switch (m->get_type()) {
        case MSG_FAILURE:
-         handle_failure((MFailure*)m);
+         handle_osd_failure((MOSDFailure*)m);
          break;
          
        case MSG_PING_ACK:
@@ -174,7 +183,13 @@ void Monitor::dispatch(Message *m)
        case MSG_OSD_BOOT:
          handle_osd_boot((MOSDBoot*)m);
          break;
-         
+       case MSG_OSD_IN:
+         handle_osd_in((MOSDIn*)m);
+         break;
+       case MSG_OSD_OUT:
+         handle_osd_out((MOSDOut*)m);
+         break;
+
        case MSG_SHUTDOWN:
          handle_shutdown(m);
          break;
@@ -212,14 +227,10 @@ void Monitor::handle_ping_ack(MPingAck *m)
   delete m;
 }
 
-void Monitor::handle_failure(MFailure *m)
+void Monitor::handle_osd_failure(MOSDFailure *m)
 {
   dout(1) << "osd failure: " << m->get_failed() << " from " << m->get_source() << endl;
   
-  // ack
-  messenger->send_message(new MFailureAck(m),
-                                                 m->get_source(), m->get_source_port());
-
   // FIXME
   // take their word for it
   int from = m->get_failed().num();
@@ -234,10 +245,10 @@ void Monitor::handle_failure(MFailure *m)
        //awaiting_maps[pending.epoch][m->get_source()] = 
 
        accept_pending();
+
        bcast_latest_osd_map_mds();   
-       bcast_latest_osd_map_osd();   // FIXME: which osds can i tell?
 
-       //send_incremental_map(osdmap->get_epoch()-1, m->get_source());
+       send_incremental_map(m->get_epoch(), m->get_source());
   }
 
   delete m;
@@ -277,6 +288,9 @@ void Monitor::handle_osd_boot(MOSDBoot *m)
        if (osdmap->osd_inst.size() == osdmap->osds.size()) {
          dout(-7) << "osd_boot all osds booted." << endl;
          osdmap->inc_epoch();
+         osdmap->encode(maps[osdmap->get_epoch()]); // 1
+         pending.epoch = osdmap->get_epoch()+1;     // 2
+
          bcast_latest_osd_map_osd();
          bcast_latest_osd_map_mds();
        } else {
@@ -313,14 +327,40 @@ void Monitor::handle_osd_boot(MOSDBoot *m)
   bcast_latest_osd_map_mds();
 }
 
+void Monitor::handle_osd_in(MOSDIn *m)
+{
+  dout(7) << "osd_in from " << m->get_source() << endl;
+  int from = m->get_source().num();
+
+  if (osdmap->is_out(from)) 
+       pending.new_in.push_back(from);
+  accept_pending();
+  send_incremental_map(m->map_epoch, m->get_source());
+}
+
+void Monitor::handle_osd_out(MOSDOut *m)
+{
+  dout(7) << "osd_out from " << m->get_source() << endl;
+  int from = m->get_source().num();
+  if (osdmap->is_in(from)) {
+       pending.new_out.push_back(from);
+       accept_pending();
+       send_incremental_map(m->map_epoch, m->get_source());
+  }
+}
+
 void Monitor::handle_osd_getmap(MOSDGetMap *m)
 {
   dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl;
   
-  if (m->get_since())
-       send_incremental_map(m->get_since(), m->get_source());
-  else
-       send_full_map(m->get_source());
+  if (osdmap->get_epoch() == 0) {
+       awaiting_map[1][m->get_source()] = m->get_since();
+  } else {
+       if (m->get_since())
+         send_incremental_map(m->get_since(), m->get_source());
+       else
+         send_full_map(m->get_source());
+  }
   delete m;
 }
 
index e960e49f5c632a68d8383bf6e948864b1cd8a59f..d848d1a53e18eaa0e2260d0378f7bc73c54ff4c8 100644 (file)
@@ -61,10 +61,6 @@ protected:
   void bcast_latest_osd_map_mds();
   void bcast_latest_osd_map_osd();
 
-  void get_min_epoch();
-  void start_read_timer();
-  
-
  public:
   Monitor(int w, Messenger *m) : 
        whoami(w),
@@ -78,9 +74,10 @@ protected:
   void dispatch(Message *m);
   void handle_shutdown(Message *m);
 
-  void handle_failure(class MFailure *m);
-
   void handle_osd_boot(class MOSDBoot *m);
+  void handle_osd_in(class MOSDIn *m);
+  void handle_osd_out(class MOSDOut *m);
+  void handle_osd_failure(class MOSDFailure *m);
   void handle_osd_getmap(class MOSDGetMap *m);
 
   void handle_ping_ack(class MPingAck *m);
@@ -90,6 +87,7 @@ protected:
 
   // hack
   void fake_osd_failure(int osd, bool down);
+  void fake_osdmap_update();
   void fake_reorg();
 
 };
index e8f6b25c1761b92318fcc22d21f6ee875511d636..908d956f6c25ad3f3328f9308c27fd85365a2393 100644 (file)
 
 #include "config.h"
 
+#undef dout
+#define dout(x) if ((x) <= g_conf.debug_ms) cout 
+
+
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <map>
@@ -188,10 +193,11 @@ int fakemessenger_do_loop_2()
          
          if (m) {
                //dout(18) << "got " << m << endl;
-               dout(5) << "---- '" << m->get_type_name() << 
-                 "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
-                 " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " << m 
-                        << endl;
+               dout(1) << "---- '" << m->get_type_name() 
+                               << "' from " << MSG_ADDR_NICE(m->get_source()) // << ':' << m->get_source_port() 
+                               << " to " << MSG_ADDR_NICE(m->get_dest()) //<< ':' << m->get_dest_port() 
+                               << " ---- " << m 
+                               << endl;
                
                if (g_conf.fakemessenger_serialize) {
                  // encode
@@ -345,7 +351,9 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp
        }
        dm->queue_incoming(m);
 
-       dout(5) << "--> sending " << m << " to " << MSG_ADDR_NICE(dest) << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl;
+       dout(1) << "--> " << myaddr << " sending " << m << " '" << m->get_type_name() << "'"
+                       << " to " << MSG_ADDR_NICE(dest) 
+                       << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl;
        
   }
   catch (...) {
index 749fedd658fef07362c4d9dc364a0a9cf78d4a28..205d2a9c147a509d4051404ebfc3e20b99cda3df 100644 (file)
@@ -28,6 +28,8 @@ using namespace std;
 #include "messages/MFailureAck.h"
 
 #include "messages/MOSDBoot.h"
+#include "messages/MOSDIn.h"
+#include "messages/MOSDOut.h"
 #include "messages/MOSDFailure.h"
 #include "messages/MOSDPing.h"
 #include "messages/MOSDOp.h"
@@ -180,6 +182,12 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_OSD_BOOT:
        m = new MOSDBoot();
        break;
+  case MSG_OSD_IN:
+       m = new MOSDIn();
+       break;
+  case MSG_OSD_OUT:
+       m = new MOSDOut();
+       break;
   case MSG_OSD_FAILURE:
        m = new MOSDFailure();
        break;
index bc47cd5f4aec7508e8316d74c8fbde89d927ff89..4b64531a08c8a5d127cecc1f3b43cd1ca00dd043 100644 (file)
@@ -55,6 +55,8 @@
 
 #define MSG_OSD_FAILURE      27
 
+#define MSG_OSD_IN           28
+#define MSG_OSD_OUT          29
 
 
 
index c576261823cf2fc746c8511344b64e28fb922217..10e6d902ba4f9a1989fdc0d7e21f642beb601a79 100644 (file)
@@ -9,7 +9,7 @@ using namespace std;
 #include "mds/MDCluster.h"
 #include "mds/MDS.h"
 #include "osd/OSD.h"
-#include "mds/OSDMonitor.h"
+#include "mon/Monitor.h"
 #include "client/Client.h"
 #include "client/SyntheticClient.h"
 
@@ -104,11 +104,14 @@ int main(int argc, char **argv)
   int world = mpiwho.second;
 
   int need = 0;
-  if (g_conf.tcp_skip_rank0) need++;
+  if (g_conf.ms_skip_rank0) need++;
   need += NUMMDS;
-  need += NUMOSD;
+  if (g_conf.ms_stripe_osds)
+       need++;
+  else
+       need += NUMOSD;
   if (NUMCLIENT) {
-       if (!g_conf.tcp_overlay_clients)
+       if (!g_conf.ms_overlay_clients)
          need += 1;
   }
   assert(need <= world);
@@ -129,7 +132,7 @@ int main(int argc, char **argv)
   
   // create mon
   if (myrank == 0) {
-       OSDMonitor *mon = new OSDMonitor(0, rank.register_entity(MSG_ADDR_MON(0)));
+       Monitor *mon = new Monitor(0, rank.register_entity(MSG_ADDR_MON(0)));
        mon->init();
   }
 
@@ -137,7 +140,7 @@ int main(int argc, char **argv)
   map<int,MDS*> mds;
   map<int,OSD*> mdsosd;
   for (int i=0; i<NUMMDS; i++) {
-       if (myrank != g_conf.tcp_skip_rank0+i) continue;
+       if (myrank != g_conf.ms_skip_rank0+i) continue;
        Messenger *m = rank.register_entity(MSG_ADDR_MDS(i));
        cerr << "mds" << i << " on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl;
        mds[i] = new MDS(mdc, i, m);
@@ -151,9 +154,15 @@ int main(int argc, char **argv)
   }
   
   // create osd
-  map<int, OSD *> osd;
+  map<int,OSD*> osd;
+  int max_osd_nodes = world - NUMMDS - g_conf.ms_skip_rank0;  // assumes 0 clients, if we stripe.
+  int osds_per_node = (NUMOSD-1)/max_osd_nodes + 1;
   for (int i=0; i<NUMOSD; i++) {
-       if (myrank != g_conf.tcp_skip_rank0+NUMMDS + i) continue;
+       if (g_conf.ms_stripe_osds) {
+         if (myrank != g_conf.ms_skip_rank0+NUMMDS + i / osds_per_node) continue;
+       } else {
+         if (myrank != g_conf.ms_skip_rank0+NUMMDS + i) continue;
+       }
        Messenger *m = rank.register_entity(MSG_ADDR_OSD(i));
        cerr << "osd" << i << " on tcprank " << rank.my_rank <<  " " << hostname << "." << pid << endl;
        osd[i] = new OSD(i, m);
@@ -161,21 +170,21 @@ int main(int argc, char **argv)
        started++;
   }
   
-  if (g_conf.tcp_overlay_clients) sleep(5);
+  if (g_conf.ms_overlay_clients) sleep(5);
 
   // create client
   int skip_osd = NUMOSD;
-  if (g_conf.tcp_overlay_clients) 
+  if (g_conf.ms_overlay_clients) 
        skip_osd = 0;        // put clients with osds too!
-  int client_nodes = world - NUMMDS - skip_osd - g_conf.tcp_skip_rank0;
+  int client_nodes = world - NUMMDS - skip_osd - g_conf.ms_skip_rank0;
   int clients_per_node = 1;
-  if (NUMCLIENT) clients_per_node = (NUMCLIENT-1) / client_nodes + 1;
+  if (NUMCLIENT && client_nodes > 0) clients_per_node = (NUMCLIENT-1) / client_nodes + 1;
   set<int> clientlist;
   map<int,Client *> client;//[NUMCLIENT];
   map<int,SyntheticClient *> syn;//[NUMCLIENT];
   for (int i=0; i<NUMCLIENT; i++) {
        //if (myrank != NUMMDS + NUMOSD + i % client_nodes) continue;
-       if (myrank != g_conf.tcp_skip_rank0+NUMMDS + skip_osd + i / clients_per_node) continue;
+       if (myrank != g_conf.ms_skip_rank0+NUMMDS + skip_osd + i / clients_per_node) continue;
        clientlist.insert(i);
        client[i] = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW));//(i)) );
 
index cdced4839d912eb682f279e973100737cf821144..4b558f89cabbc2f3f3667e28525b800dee93eae3 100644 (file)
@@ -97,6 +97,9 @@ void Ager::age(int time,
                           int count,         // this many times
                           float final_water,   // and end here ( <= low_water)
                           int fake_size_mb) { 
+
+  store->_fake_writes(true);
+
   utime_t until = g_clock.now();
   until.sec_ref() += time;
   
@@ -148,6 +151,7 @@ void Ager::age(int time,
          age_empty(low_water);
        }
   }
+  store->_fake_writes(false);
   store->sync();
   store->sync();
   dout(1) << "age finished" << endl;
index 909b5f88ea602723d4eea7e30b16057ca3272f48..451cdc40b8221ecad2690364f306fb027d69f36e 100644 (file)
@@ -42,6 +42,8 @@
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDBoot.h"
+#include "messages/MOSDIn.h"
+#include "messages/MOSDOut.h"
 
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
@@ -126,9 +128,9 @@ OSD::OSD(int id, Messenger *m, char *dev)
 
   // init object store
   // try in this order:
-  // ebofsdev/all
   // ebofsdev/$num
   // ebofsdev/$hostname
+  // ebofsdev/all
 
   if (dev) {
        strcpy(dev_path,dev);
@@ -136,14 +138,15 @@ OSD::OSD(int id, Messenger *m, char *dev)
        char hostname[100];
        hostname[0] = 0;
        gethostname(hostname,100);
-       sprintf(dev_path, "%s/all", ebofs_base_path);
+       
+       sprintf(dev_path, "%s/%d", ebofs_base_path, whoami);
        
        struct stat sta;
        if (::lstat(dev_path, &sta) != 0)
-         sprintf(dev_path, "%s/%d", ebofs_base_path, whoami);
+         sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);        
        
        if (::lstat(dev_path, &sta) != 0)
-         sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);        
+         sprintf(dev_path, "%s/all", ebofs_base_path);
   }
 
   if (g_conf.ebofs) {
@@ -244,13 +247,20 @@ int OSD::init()
        
        osd_logtype.add_inc("r_pull");
        osd_logtype.add_inc("r_pullb");
-       osd_logtype.add_inc("r_push");
-       osd_logtype.add_inc("r_pushb");
        osd_logtype.add_inc("r_wr");
        osd_logtype.add_inc("r_wrb");
        
        osd_logtype.add_inc("rlsum");
        osd_logtype.add_inc("rlnum");
+
+       osd_logtype.add_set("numpg");
+       osd_logtype.add_set("pingset");
+
+       osd_logtype.add_inc("map");
+       osd_logtype.add_inc("mapi");
+       osd_logtype.add_inc("mapidup");
+       osd_logtype.add_inc("mapf");
+       osd_logtype.add_inc("mapfdup");
        
        // request thread pool
        {
@@ -461,7 +471,7 @@ void OSD::heartbeat()
   utime_t since = now;
   since.sec_ref() -= g_conf.osd_heartbeat_interval;
 
-  dout(15) << "heartbeat " << now << endl;
+  dout(-15) << "heartbeat " << now << endl;
 
   // send pings
   set<int> pingset;
@@ -487,6 +497,23 @@ void OSD::heartbeat()
                                                        MSG_ADDR_OSD(*i));
   }
 
+  logger->set("pingset", pingset.size());
+
+  // hack: fake reorg?
+  if (osdmap) {
+       if ((rand() % (2*g_conf.num_osd)) == whoami) {
+         if (osdmap->is_out(whoami)) {
+               messenger->send_message(new MOSDIn(osdmap->get_epoch()),
+                                                               MSG_ADDR_MON(0));
+         } 
+         else if ((rand() % g_conf.fake_osdmap_updates) == 0) {
+               //messenger->send_message(new MOSDOut(osdmap->get_epoch()),
+               messenger->send_message(new MOSDIn(osdmap->get_epoch()),
+                                                               MSG_ADDR_MON(0));
+         }
+       }
+  }
+
   // schedule next!
   next_heartbeat = new C_Heartbeat(this);
   g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat);
@@ -535,16 +562,18 @@ void OSD::_share_map_outgoing(msg_addr_t dest)
 {
   assert(dest.is_osd());
 
-  // send map?
-  if (peer_map_epoch.count(dest)) {
-       // ??? send recent ???
-       // do nothing.
-  }
-  else {
-       epoch_t pe = peer_map_epoch[dest];
-       if (pe < osdmap->get_epoch()) {
-         send_incremental_map(pe, dest, true);
-         peer_map_epoch[dest] = osdmap->get_epoch();
+  if (dest.is_osd()) {
+       // send map?
+       if (peer_map_epoch.count(dest)) {
+         epoch_t pe = peer_map_epoch[dest];
+         if (pe < osdmap->get_epoch()) {
+               send_incremental_map(pe, dest, true);
+               peer_map_epoch[dest] = osdmap->get_epoch();
+         }
+       } else {
+         // no idea about peer's epoch.
+         // ??? send recent ???
+         // do nothing.
        }
   }
 }
@@ -625,8 +654,7 @@ void OSD::dispatch(Message *m)
 
          case MSG_OSD_PING:
                // take note.
-               dout(20) << "osdping from " << m->get_source() << endl;
-               _share_map_incoming(m->get_source(), ((MOSDPing*)m)->map_epoch);
+               handle_osd_ping((MOSDPing*)m);
                break;
                
          case MSG_OSD_PG_NOTIFY:
@@ -809,15 +837,16 @@ void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit,
 
 
 
-/*
-void OSD::handle_ping(MPing *m)
+void OSD::handle_osd_ping(MOSDPing *m)
 {
-  dout(7) << "got ping, replying" << endl;
-  messenger->send_message(new MPingAck(m),
-                                                 m->get_source(), m->get_source_port(), 0);
-  delete m;
+  dout(20) << "osdping from " << m->get_source() << endl;
+  _share_map_incoming(m->get_source(), ((MOSDPing*)m)->map_epoch);
+  
+  //if (!m->ack)
+  //messenger->send_message(new MOSDPing(osdmap->get_epoch(), true),
+  //m->get_source());
 }
-*/
+
 
 
 
@@ -856,6 +885,8 @@ void OSD::handle_osd_map(MOSDMap *m)
        boot_epoch = m->get_last(); // hrm...?
   }
 
+  logger->inc("mapmsg");
+
   // store them?
   for (map<epoch_t,bufferlist>::iterator p = m->maps.begin();
           p != m->maps.end();
@@ -863,6 +894,7 @@ void OSD::handle_osd_map(MOSDMap *m)
        object_t oid = get_osdmap_object_name(p->first);
        if (store->exists(oid)) {
          dout(10) << "handle_osd_map already had full map epoch " << p->first << endl;
+         logger->inc("mapfdup");
          bufferlist bl;
          get_map_bl(p->first, bl);
          dout(10) << " .. it is " << bl.length() << " bytes" << endl;
@@ -878,6 +910,8 @@ void OSD::handle_osd_map(MOSDMap *m)
        if (p->first < superblock.oldest_map ||
                superblock.oldest_map == 0)
          superblock.oldest_map = p->first;
+
+       logger->inc("mapf");
   }
   for (map<epoch_t,bufferlist>::iterator p = m->incremental_maps.begin();
           p != m->incremental_maps.end();
@@ -885,6 +919,7 @@ void OSD::handle_osd_map(MOSDMap *m)
        object_t oid = get_inc_osdmap_object_name(p->first);
        if (store->exists(oid)) {
          dout(10) << "handle_osd_map already had incremental map epoch " << p->first << endl;
+         logger->inc("mapidup");
          bufferlist bl;
          get_inc_map_bl(p->first, bl);
          dout(10) << " .. it is " << bl.length() << " bytes" << endl;
@@ -900,6 +935,8 @@ void OSD::handle_osd_map(MOSDMap *m)
        if (p->first < superblock.oldest_map ||
                superblock.oldest_map == 0)
          superblock.oldest_map = p->first;
+
+       logger->inc("mapi");
   }
 
   // advance if we can
@@ -1257,6 +1294,8 @@ void OSD::activate_map(ObjectStore::Transaction& t)
   
   // do queries.
   do_queries(query_map);
+
+  logger->set("numpg", pg_map.size());
 }
 
 
index cbc8918a08addcd8d9e20f9cb967927fb4825e39..7bff2e909d45245892ccf96fba42759def0e27a6 100644 (file)
@@ -264,7 +264,7 @@ public:
   virtual Message *ms_handle_failure(msg_addr_t dest, entity_inst_t& inst);
   virtual bool ms_lookup(msg_addr_t dest, entity_inst_t& inst);
 
-  //void handle_ping(class MPing *m);
+  void handle_osd_ping(class MOSDPing *m);
   void handle_op(class MOSDOp *m);
 
   void op_read(class MOSDOp *m, PG *pg);
index d5cc79d157aeeaa1f6d607c810e08a509e82fb50..e948183179e42cea9de9e651498832d2b9bdb5cc 100644 (file)
@@ -415,6 +415,7 @@ public:
   
   virtual void sync() {};
   
+  virtual void _fake_writes(bool b) {};
   
 };
 
index 74da531d9cadea3d5a7eaa48e1037bc250b13078..1d2d5b0e520d0645d3622b246154a28b30f0b08c 100644 (file)
@@ -9,7 +9,7 @@ using namespace std;
 #include "mds/MDCluster.h"
 #include "mds/MDS.h"
 #include "osd/OSD.h"
-#include "mds/OSDMonitor.h"
+#include "mon/Monitor.h"
 #include "client/Client.h"
 #include "client/SyntheticClient.h"
 
@@ -129,7 +129,7 @@ int main(int argc, char **argv)
   
   // create mon
   if (myrank == 0) {
-       OSDMonitor *mon = new OSDMonitor(0, new TCPMessenger(MSG_ADDR_MON(0)));
+       Monitor *mon = new Monitor(0, new TCPMessenger(MSG_ADDR_MON(0)));
        mon->init();
   }