]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merge branches/riccardo/monitor1 changes into trunk (pg log storage fix, monitorstore...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 26 Feb 2007 00:17:32 +0000 (00:17 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 26 Feb 2007 00:17:32 +0000 (00:17 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1130 29311d96-e01e-0410-9327-a35deaab8ce9

32 files changed:
trunk/ceph/Makefile
trunk/ceph/client/Client.cc
trunk/ceph/client/Client.h
trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/include/object.h
trunk/ceph/messages/MMonPaxos.h [new file with mode: 0644]
trunk/ceph/mon/Elector.cc
trunk/ceph/mon/Elector.h
trunk/ceph/mon/MDSMonitor.cc
trunk/ceph/mon/MDSMonitor.h
trunk/ceph/mon/Monitor.cc
trunk/ceph/mon/Monitor.h
trunk/ceph/mon/MonitorStore.cc [new file with mode: 0644]
trunk/ceph/mon/MonitorStore.h [new file with mode: 0644]
trunk/ceph/mon/OSDMonitor.cc
trunk/ceph/mon/OSDMonitor.h
trunk/ceph/mon/Paxos.cc [new file with mode: 0644]
trunk/ceph/mon/Paxos.h [new file with mode: 0644]
trunk/ceph/msg/Dispatcher.h
trunk/ceph/msg/FakeMessenger.cc
trunk/ceph/msg/Message.cc
trunk/ceph/msg/Message.h
trunk/ceph/msg/SimpleMessenger.cc
trunk/ceph/msg/msg_types.h
trunk/ceph/osd/FakeStore.cc
trunk/ceph/osd/FakeStore.h
trunk/ceph/osd/OSD.cc
trunk/ceph/osd/ObjectStore.h
trunk/ceph/osd/PG.cc
trunk/ceph/osd/PG.h
trunk/ceph/osd/osd_types.h

index ae214a73e5bd5dc3f2557d1068de6c636b25d78a..67097eab65a2a485adb9a6d768b2fb5e1624ce82 100644 (file)
@@ -83,10 +83,12 @@ OSDC_OBJS= \
 
 MON_OBJS= \
        mon/Monitor.o\
+       mon/Paxos.o\
        mon/OSDMonitor.o\
        mon/MDSMonitor.o\
        mon/ClientMonitor.o\
-       mon/Elector.o
+       mon/Elector.o\
+       mon/MonitorStore.o
 
 COMMON_OBJS= \
        msg/Message.o\
@@ -109,7 +111,7 @@ OSBDB_OBJS = \
 OSBDB_OBJ = osbdb.o
 endif
 
-TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn
+TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn mkmonmap
 
 SRCS=*.cc */*.cc *.h */*.h */*/*.h
 
@@ -124,7 +126,7 @@ obfs: depend obfstest
 mkmonmap: mkmonmap.cc common.o
        ${CC} ${CFLAGS} ${LIBS} $^ -o $@
 
-cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o
+cmon: cmon.cc mon.o msg/SimpleMessenger.o common.o
        ${CC} ${CFLAGS} ${LIBS} $^ -o $@
 
 cosd: cosd.cc osd.o ebofs.o ${OSBDB_OBJ} msg/SimpleMessenger.o common.o
index 2b85f1a7ad97d7461b86e618008c36eb4806bd43..19153ae30803e6fc2ba097c323bcd4e0218655e7 100644 (file)
@@ -2617,8 +2617,10 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count)
 }
 
 
-void Client::ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst)
+void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
 {
+  entity_name_t dest = inst.name;
+
   if (dest.is_mon()) {
     // resend to a different monitor.
     int mon = monmap->pick_mon(true);
index de16639c89e642affe398c30bbba6fbb64265b4a..e00c6eae3ca4814e7a6ae82fa560eb6176bc0821 100644 (file)
@@ -582,7 +582,7 @@ protected:
 
   int describe_layout(char *fn, list<ObjectExtent>& result);
 
-  void ms_handle_failure(Message*, entity_name_t dest, const entity_inst_t& inst);
+  void ms_handle_failure(Message*, const entity_inst_t& inst);
 };
 
 #endif
index 9219d3b9d16bf15ecffec16b690f68d94a1ee929..b164218d81a8ebfde3d7fcb3fefe12061b17d19c 100644 (file)
@@ -221,7 +221,8 @@ md_config_t g_conf = {
   fakestore_fsync: false,//true,
   fakestore_writesync: false,
   fakestore_syncthreads: 4,
-  fakestore_fakeattr: true,   
+  fakestore_fake_attrs: false,
+  fakestore_fake_collections: false,   
   fakestore_dev: 0,
 
   // --- ebofs ---
@@ -685,6 +686,10 @@ void parse_config_options(std::vector<char*>& args)
       g_conf.fakestore_writesync = atoi(args[++i]);
     else if (strcmp(args[i], "--fakestore_dev") == 0) 
       g_conf.fakestore_dev = args[++i];
+    else if (strcmp(args[i], "--fakestore_fake_attrs") == 0) 
+      g_conf.fakestore_fake_attrs = true;//atoi(args[++i]);
+    else if (strcmp(args[i], "--fakestore_fake_collections") == 0) 
+      g_conf.fakestore_fake_collections = true;//atoi(args[++i]);
 
     else if (strcmp(args[i], "--obfs") == 0) {
       g_conf.uofs = 1;
index b0edea33ffacb5d8f54e0e5cbb52d7b2603dd81d..cf0b4540b07c88a5c1171f120e1622a8292e7ac4 100644 (file)
@@ -211,7 +211,8 @@ struct md_config_t {
   bool  fakestore_fsync;
   bool  fakestore_writesync;
   int   fakestore_syncthreads;   // such crap
-  bool  fakestore_fakeattr;
+  bool  fakestore_fake_attrs;
+  bool  fakestore_fake_collections;
   char  *fakestore_dev;
 
   // ebofs
index 3a66c4ab83d540440a6a21a6e353c1ec2bf31299..9773ecb4b3288962fe81b4c985b8838bf81fcb91 100644 (file)
@@ -30,6 +30,7 @@ struct object_t {
 
   object_t() : ino(0), bno(0), rev(0) {}
   object_t(__uint64_t i, __uint32_t b) : ino(i), bno(b), rev(0) {}
+  object_t(__uint64_t i, __uint32_t b, __uint32_t r) : ino(i), bno(b), rev(r) {}
 };
 
 
diff --git a/trunk/ceph/messages/MMonPaxos.h b/trunk/ceph/messages/MMonPaxos.h
new file mode 100644 (file)
index 0000000..b3f6e85
--- /dev/null
@@ -0,0 +1,80 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MMONPAXOS_H
+#define __MMONPAXOS_H
+
+#include "msg/Message.h"
+
+class MMonPaxos : public Message {
+ public:
+  // op types
+  const static int OP_COLLECT = 1;   // proposer: propose round
+  const static int OP_LAST = 2;                 // voter:    accept proposed round
+  const static int OP_OLDROUND = 3;     // voter:    notify proposer he proposed an old round
+  const static int OP_BEGIN = 4;        // proposer: value proposed for this round
+  const static int OP_ACCEPT = 5;       // voter:    accept propsed value
+  const static int OP_SUCCESS = 7;   // proposer: notify learners of agreed value
+  const static int OP_ACK = 8;          // learner:  notify proposer that new value has been saved
+
+  int op;   
+  int machine_id;
+  version_t proposal;
+  version_t n;
+  bufferlist value;
+
+  MMonPaxos() : Message(MSG_MON_PAXOS) {}
+  MMonPaxos(int o, int mid, 
+           version_t pn, version_t v) : Message(MSG_MON_PAXOS),
+                                            op(o), machine_id(mid), 
+                                            proposal(pn), n(v) {}
+  MMonPaxos(int o, int mid, 
+           version_t pn, version_t v, 
+           bufferlist& b) : Message(MSG_MON_PAXOS),
+                            op(o), machine_id(mid),
+                            proposal(pn), n(v), 
+                            value(b) {}
+  
+  virtual char *get_type_name() { return "paxos"; }
+  
+  void print(ostream& out) {
+    out << "paxos(op " << op
+       << ", machine " << machine_id
+       << ", proposal " << proposal 
+       << ", state " << n 
+       << ", " << value.length() << " bytes)";
+  }
+
+  void encode_payload() {
+    payload.append((char*)&op, sizeof(op));
+    payload.append((char*)&machine_id, sizeof(machine_id));
+    payload.append((char*)&proposal, sizeof(proposal));
+    payload.append((char*)&n, sizeof(n));
+    ::_encode(value, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    payload.copy(off, sizeof(op), (char*)&op);
+    off += sizeof(op);
+    payload.copy(off, sizeof(machine_id), (char*)&machine_id);
+    off += sizeof(machine_id);
+    payload.copy(off, sizeof(proposal), (char*)&proposal);
+    off += sizeof(proposal);
+    payload.copy(off, sizeof(n), (char*)&n);
+    off += sizeof(n);
+    ::_decode(value, payload, off);
+  }
+};
+
+#endif
index 563fcc687489b2cc72893a67a108ccd4eea4e15c..d3098ba065a470bddf04c6177abc3ffe8b350d5e 100644 (file)
@@ -1,3 +1,15 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "Elector.h"
 #include "Monitor.h"
@@ -28,9 +40,9 @@ void Elector::start()
   
   // bcast to everyone else
   for (int i=0; i<mon->monmap->num_mon; ++i) {
-       if (i == whoami) continue;
-       mon->messenger->send_message(new MMonElectionPropose,
-                                                                mon->monmap->get_inst(i));
+    if (i == whoami) continue;
+    mon->messenger->send_message(new MMonElectionPropose,
+                                mon->monmap->get_inst(i));
   }
   
   reset_timer();
@@ -41,18 +53,18 @@ void Elector::defer(int who)
   dout(5) << "defer to " << who << endl;
 
   if (electing_me) {
-       acked_me.clear();
-       electing_me = false;
+    acked_me.clear();
+    electing_me = false;
   }
 
   // ack them
   leader_acked = who;
   ack_stamp = g_clock.now();
   mon->messenger->send_message(new MMonElectionAck,
-                                                          mon->monmap->get_inst(who));
+                              mon->monmap->get_inst(who));
   
   // set a timer
-  reset_timer();
+  reset_timer(1.0);  // give the leader some extra time to declare victory
 }
 
 
@@ -61,24 +73,24 @@ class C_Mon_ElectionExpire : public Context {
 public:
   C_Mon_ElectionExpire(Elector *e) : elector(e) { }
   void finish(int r) {
-       elector->expire();
+    elector->expire();
   }
 };
 
-void Elector::reset_timer()
+void Elector::reset_timer(double plus)
 {
   // set the timer
   cancel_timer();
   expire_event = new C_Mon_ElectionExpire(this);
-  g_timer.add_event_after(g_conf.mon_lease,
-                                                        expire_event);
+  g_timer.add_event_after(g_conf.mon_lease + plus,
+                         expire_event);
 }
 
 
 void Elector::cancel_timer()
 {
   if (expire_event)
-       g_timer.cancel_event(expire_event);
+    g_timer.cancel_event(expire_event);
 }
 
 void Elector::expire()
@@ -87,12 +99,12 @@ void Elector::expire()
   
   // did i win?
   if (electing_me &&
-         acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
-       // i win
-       victory();
+      acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
+    // i win
+    victory();
   } else {
-       // whoever i deferred to didn't declare victory quickly enough.
-       start();
+    // whoever i deferred to didn't declare victory quickly enough.
+    start();
   }
 }
 
@@ -102,11 +114,13 @@ void Elector::victory()
   leader_acked = -1;
   electing_me = false;
 
+  cancel_timer();
+
   // tell everyone
   for (int i=0; i<mon->monmap->num_mon; ++i) {
-       if (i == whoami) continue;
-       mon->messenger->send_message(new MMonElectionVictory,
-                                                                mon->monmap->get_inst(i));
+    if (i == whoami) continue;
+    mon->messenger->send_message(new MMonElectionVictory,
+                                mon->monmap->get_inst(i));
   }
     
   // tell monitor
@@ -120,18 +134,19 @@ void Elector::handle_propose(MMonElectionPropose *m)
   int from = m->get_source().num();
 
   if (from > whoami) {
-       // wait, i should win!
-       if (!electing_me)
-         start();
+    // wait, i should win!
+    if (!electing_me)
+      start();
   } else {
-       // they would win over me
-       if (leader_acked < 0 ||      // haven't acked anyone yet, or
-               leader_acked > from) {   // they would win over who you did ack
-         defer(from);
-       } else {
-         // ignore them!
-         dout(5) << "no, we already acked " << leader_acked << endl;
-       }
+    // they would win over me
+    if (leader_acked < 0 ||      // haven't acked anyone yet, or
+       leader_acked > from ||   // they would win over who you did ack, or
+       leader_acked == from) {  // this is the guy we're already deferring to
+      defer(from);
+    } else {
+      // ignore them!
+      dout(5) << "no, we already acked " << leader_acked << endl;
+    }
   }
   
   delete m;
@@ -143,17 +158,17 @@ void Elector::handle_ack(MMonElectionAck *m)
   int from = m->get_source().num();
   
   if (electing_me) {
-       // thanks
-       acked_me.insert(from);
-       dout(5) << " so far i have " << acked_me << endl;
-       
-       // is that _everyone_?
-       if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
-         // if yes, shortcut to election finish
-         victory();
-       }
+    // thanks
+    acked_me.insert(from);
+    dout(5) << " so far i have " << acked_me << endl;
+    
+    // is that _everyone_?
+    if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
+      // if yes, shortcut to election finish
+      victory();
+    }
   } else {
-       // ignore, i'm deferring already.
+    // ignore, i'm deferring already.
   }
   
   delete m;
@@ -165,14 +180,14 @@ void Elector::handle_victory(MMonElectionVictory *m)
   int from = m->get_source().num();
   
   if (from < whoami) {
-       // ok, fine, they win
-       mon->lose_election(from);
-
-       // cancel my timer
-       cancel_timer(); 
+    // ok, fine, they win
+    mon->lose_election(from);
+    
+    // cancel my timer
+    cancel_timer();    
   } else {
-       // no, that makes no sense, i should win.  start over!
-       start();
+    // no, that makes no sense, i should win.  start over!
+    start();
   }
 }
 
@@ -183,19 +198,19 @@ void Elector::dispatch(Message *m)
 {
   switch (m->get_type()) {
   case MSG_MON_ELECTION_ACK:
-       handle_ack((MMonElectionAck*)m);
-       break;
+    handle_ack((MMonElectionAck*)m);
+    break;
     
   case MSG_MON_ELECTION_PROPOSE:
-       handle_propose((MMonElectionPropose*)m);
-       break;
+    handle_propose((MMonElectionPropose*)m);
+    break;
     
   case MSG_MON_ELECTION_VICTORY:
-       handle_victory((MMonElectionVictory*)m);
-       break;
-       
+    handle_victory((MMonElectionVictory*)m);
+    break;
+    
   default:
-       assert(0);
+    assert(0);
   }
 }
 
index bc556ae04ea7cb20399b1af81259b3af5601cbec..67ed59945c46bc7b849c5ed12766de5b42be31e1 100644 (file)
@@ -35,7 +35,7 @@ class Elector {
 
   Context *expire_event;
 
-  void reset_timer();
+  void reset_timer(double plus=0.0);
   void cancel_timer();
 
   // electing me
index 3ea7ad235ada0223f1d5c3e034f562fad3ca505c..24beadf85e9f07205446d16d27dfb10b0a71fb3b 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "MDSMonitor.h"
 #include "Monitor.h"
+#include "MonitorStore.h"
 
 #include "messages/MMDSMap.h"
 #include "messages/MMDSGetMap.h"
 
 /********* MDS map **************/
 
-void MDSMonitor::create_initial()
-{
-  mdsmap.epoch = 0;  // until everyone boots
-  mdsmap.ctime = g_clock.now();
-  
-  print_map();
-}
-
 void MDSMonitor::dispatch(Message *m)
 {
   switch (m->get_type()) {
@@ -55,6 +48,50 @@ void MDSMonitor::dispatch(Message *m)
   }  
 }
 
+
+
+void MDSMonitor::election_finished()
+{
+  if (mon->is_leader()) {
+
+    // FIXME be smarter later.
+
+    if (g_conf.mkfs) {
+      create_initial();
+      save_map();
+    } else {
+      load_map();
+    }
+  }
+}
+
+
+void MDSMonitor::create_initial()
+{
+  mdsmap.epoch = 0;  // until everyone boots
+  mdsmap.ctime = g_clock.now();
+
+  mdsmap.encode(encoded_map);
+
+  print_map();
+}
+
+void MDSMonitor::load_map()
+{
+  int r = mon->store->get_bl_ss(encoded_map, "mdsmap", "current");
+  assert(r > 0);
+  mdsmap.decode(encoded_map);
+  dout(7) << "load_map epoch " << mdsmap.get_epoch() << endl;
+}
+
+void MDSMonitor::save_map()
+{
+  dout(7) << "save_map epoch " << mdsmap.get_epoch() << endl;
+  
+  int r = mon->store->put_bl_ss(encoded_map, "mdsmap", "current");
+  assert(r>=0);
+}
+
 void MDSMonitor::print_map()
 {
   dout(7) << "print_map epoch " << mdsmap.get_epoch() << endl;
@@ -72,6 +109,22 @@ void MDSMonitor::print_map()
 }
 
 
+void MDSMonitor::issue_map()
+{
+  mdsmap.inc_epoch();
+  encoded_map.clear();
+  mdsmap.encode(encoded_map);
+
+  dout(7) << "issue_map epoch " << mdsmap.get_epoch() << endl;
+  
+  save_map();
+  print_map();
+  
+  // bcast map
+  bcast_latest_mds();
+  send_current();
+}
+
 
 void MDSMonitor::handle_mds_beacon(MMDSBeacon *m)
 {
@@ -189,15 +242,7 @@ void MDSMonitor::handle_mds_beacon(MMDSBeacon *m)
     else
       mdsmap.mds_state_seq.erase(from);
     
-    // inc map version
-    mdsmap.inc_epoch();
-    mdsmap.encode(maps[mdsmap.get_epoch()]);
-    
-    print_map();
-
-    // bcast map
-    bcast_latest_mds();
-    send_current();
+    issue_map();
   }
 
   delete m;
@@ -319,14 +364,7 @@ void MDSMonitor::tick()
     }
 
     if (changed) {
-      mdsmap.inc_epoch();
-      mdsmap.encode(maps[mdsmap.get_epoch()]);
-      
-      print_map();
-      
-      // bcast map
-      bcast_latest_mds();
-      send_current();
+      issue_map();
     }
   }
 }
index 31c3e2a3d242b1c3b45a6598257ddc19183dfcf1..c3bc3d165883c222ba156c7a347f122e8f29e175 100644 (file)
@@ -35,7 +35,7 @@ class MDSMonitor : public Dispatcher {
   MDSMap mdsmap;
 
  private:
-  map<epoch_t, bufferlist> maps;
+  bufferlist encoded_map;
 
   //map<epoch_t, bufferlist> inc_maps;
   //MDSMap::Incremental pending_inc;
@@ -54,6 +54,10 @@ class MDSMonitor : public Dispatcher {
   void send_full(entity_inst_t dest);
   void bcast_latest_mds();
 
+  void issue_map();
+
+  void save_map();
+  void load_map();
   void print_map();
 
   //void accept_pending();   // accept pending, new map.
@@ -68,12 +72,14 @@ class MDSMonitor : public Dispatcher {
 
  public:
   MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) {
-    create_initial();
   }
 
   void dispatch(Message *m);
   void tick();  // check state, take actions
 
+  void election_starting();
+  void election_finished();
+
   void send_latest(entity_inst_t dest);
 
 };
index 8261b52cc3fe56002a68716c91d0fae5d1bf7715..57be61def3d1114f39e05883287ba1325f74700b 100644 (file)
@@ -17,7 +17,7 @@
 
 #include "osd/OSDMap.h"
 
-#include "ebofs/Ebofs.h"
+#include "MonitorStore.h"
 
 #include "msg/Message.h"
 #include "msg/Messenger.h"
@@ -26,6 +26,8 @@
 #include "messages/MPingAck.h"
 #include "messages/MGenericMessage.h"
 
+#include "messages/MMonPaxos.h"
+
 #include "common/Timer.h"
 #include "common/Clock.h"
 
@@ -48,13 +50,13 @@ void Monitor::init()
   
   // store
   char s[80];
-  sprintf(s, "dev/mon%d", whoami);
-  store = new Ebofs(s);
+  sprintf(s, "mondata/mon%d", whoami);
+  store = new MonitorStore(s);
 
   if (g_conf.mkfs)
     store->mkfs();
-  int r = store->mount();
-  assert(r >= 0);
+  else
+    store->mount();
 
   // create 
   osdmon = new OSDMonitor(this, messenger, lock);
@@ -90,12 +92,6 @@ void Monitor::shutdown()
   timer.cancel_all();
   timer.join();
   
-  // unmount my local storage
-  if (store) {
-    store->umount();
-    delete store;
-  }
-  
   // stop osds.
   for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
        it != osdmon->osdmap.get_osds().end();
@@ -105,6 +101,7 @@ void Monitor::shutdown()
     messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
                            osdmon->osdmap.get_inst(*it));
   }
+  osdmon->mark_all_down();
   
   // monitors too.
   for (int i=0; i<monmap->num_mon; i++)
@@ -112,6 +109,10 @@ void Monitor::shutdown()
       messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), 
                              monmap->get_inst(i));
 
+  // unmount my local storage
+  if (store) 
+    delete store;
+  
   // clean up
   if (monmap) delete monmap;
   if (osdmon) delete osdmon;
@@ -146,7 +147,10 @@ void Monitor::win_election(set<int>& active)
 
   // init
   osdmon->election_finished();
-  //mdsmon->election_finished();
+  mdsmon->election_finished();
+
+  // init paxos
+  test_paxos.leader_start();
 } 
 
 void Monitor::lose_election(int l) 
@@ -203,6 +207,21 @@ void Monitor::dispatch(Message *m)
       break;
 
 
+      // paxos
+    case MSG_MON_PAXOS:
+      // send it to the right paxos instance
+      switch (((MMonPaxos*)m)->machine_id) {
+      case PAXOS_TEST:
+       test_paxos.dispatch(m);
+       break;
+      case PAXOS_OSDMAP:
+       //...
+       
+      default:
+       assert(0);
+      }
+      break;
+
       // elector messages
     case MSG_MON_ELECTION_PROPOSE:
     case MSG_MON_ELECTION_ACK:
index 0b8f921720d5b1c1dff3d94ed8de13ca8393418c..6554ad36239b1b153d2df3e84b9b53e934676c42 100644 (file)
 
 #include "MonMap.h"
 #include "Elector.h"
+#include "Paxos.h"
 
-class ObjectStore;
+
+class MonitorStore;
 class OSDMonitor;
 class MDSMonitor;
 class ClientMonitor;
 
+#define PAXOS_TEST       0
+#define PAXOS_OSDMAP     1
+#define PAXOS_MDSMAP     2
+#define PAXOS_CLIENTMAP  3
+
 class Monitor : public Dispatcher {
 protected:
   // me
@@ -45,7 +52,8 @@ protected:
   friend class C_Mon_Tick;
 
   // my local store
-  ObjectStore *store;
+  //ObjectStore *store;
+  MonitorStore *store;
 
   const static int INO_ELECTOR = 1;
   const static int INO_MON_MAP = 2;
@@ -62,6 +70,11 @@ protected:
 
   //void call_election();
 
+  // paxos
+  Paxos test_paxos;
+  friend class Paxos;
+
+
   // monitor state
   const static int STATE_STARTING = 0; // electing
   const static int STATE_LEADER =   1;
@@ -96,6 +109,7 @@ protected:
   void lose_election(int l);
 
 
+
  public:
   Monitor(int w, Messenger *m, MonMap *mm) : 
     whoami(w), 
@@ -105,6 +119,9 @@ protected:
     store(0),
     elector(this, w),
     mon_epoch(0), 
+    
+    test_paxos(this, w, PAXOS_TEST, "tester"),  // tester state machine
+
     state(STATE_STARTING),
     leader(0),
     osdmon(0), mdsmon(0), clientmon(0)
diff --git a/trunk/ceph/mon/MonitorStore.cc b/trunk/ceph/mon/MonitorStore.cc
new file mode 100644 (file)
index 0000000..f93bb20
--- /dev/null
@@ -0,0 +1,198 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "MonitorStore.h"
+#include "common/Clock.h"
+
+#include "config.h"
+#undef dout
+#define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " store(" << dir <<") "
+#define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " store(" << dir <<") "
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+void MonitorStore::mount()
+{
+  dout(1) << "mount" << endl;
+  // verify dir exists
+  DIR *d = ::opendir(dir.c_str());
+  if (!d) {
+    derr(1) << "basedir " << dir << " dne" << endl;
+    assert(0);
+  }
+  ::closedir(d);
+}
+
+
+void MonitorStore::mkfs()
+{
+  dout(1) << "mkfs" << endl;
+
+  char cmd[200];
+  sprintf(cmd, "test -d %s && /bin/rm -r %s ; mkdir -p %s", dir.c_str(), dir.c_str(), dir.c_str());
+  dout(1) << cmd << endl;
+  system(cmd);
+}
+
+
+version_t MonitorStore::get_int(const char *a, const char *b)
+{
+  char fn[200];
+  if (b)
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  else
+    sprintf(fn, "%s/%s", dir.c_str(), a);
+  
+  FILE *f = ::fopen(fn, "r");
+  if (!f) 
+    return 0;
+  
+  char buf[20];
+  ::fgets(buf, 20, f);
+  ::fclose(f);
+  
+  version_t val = atoi(buf);
+  
+  if (b) {
+    dout(15) << "get_int " << a << "/" << b << " = " << val << endl;
+  } else {
+    dout(15) << "get_int " << a << " = " << val << endl;
+  }
+  return val;
+}
+
+
+void MonitorStore::put_int(version_t val, const char *a, const char *b)
+{
+  char fn[200];
+  sprintf(fn, "%s/%s", dir.c_str(), a);
+  if (b) {
+    ::mkdir(fn, 0755);
+    dout(15) << "set_int " << a << "/" << b << " = " << val << endl;
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  } else {
+    dout(15) << "set_int " << a << " = " << val << endl;
+  }
+  
+  char vs[30];
+  sprintf(vs, "%lld\n", val);
+
+  char tfn[200];
+  sprintf(tfn, "%s.new", fn);
+
+  int fd = ::open(tfn, O_WRONLY|O_CREAT);
+  assert(fd > 0);
+  ::fchmod(fd, 0644);
+  ::write(fd, vs, strlen(vs));
+  ::close(fd);
+  ::rename(tfn, fn);
+}
+
+
+// ----------------------------------------
+// buffers
+
+bool MonitorStore::exists_bl_ss(const char *a, const char *b)
+{
+  char fn[200];
+  if (b) {
+    dout(15) << "exists_bl " << a << "/" << b << endl;
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  } else {
+    dout(15) << "exists_bl " << a << endl;
+    sprintf(fn, "%s/%s", dir.c_str(), a);
+  }
+  
+  struct stat st;
+  int r = ::stat(fn, &st);
+  return r == 0;
+}
+
+
+int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
+{
+  char fn[200];
+  if (b) {
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  } else {
+    sprintf(fn, "%s/%s", dir.c_str(), a);
+  }
+  
+  int fd = ::open(fn, O_RDONLY);
+  if (!fd) {
+    if (b) {
+      dout(15) << "get_bl " << a << "/" << b << " DNE" << endl;
+    } else {
+      dout(15) << "get_bl " << a << " DNE" << endl;
+    }
+    return 0;
+  }
+
+  // read size
+  __int32_t len = 0;
+  ::read(fd, &len, sizeof(len));
+  
+  // read buffer
+  bl.clear();
+  bufferptr bp(len);
+  ::read(fd, bp.c_str(), len);
+  bl.append(bp);
+  ::close(fd);
+
+  if (b) {
+    dout(15) << "get_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+  } else {
+    dout(15) << "get_bl " << a << " = " << bl.length() << " bytes" << endl;
+  }
+
+  return len;
+}
+
+int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
+{
+  char fn[200];
+  sprintf(fn, "%s/%s", dir.c_str(), a);
+  if (b) {
+    ::mkdir(fn, 0755);
+    dout(15) << "put_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  } else {
+    dout(15) << "put_bl " << a << " = " << bl.length() << " bytes" << endl;
+  }
+  
+  char tfn[200];
+  sprintf(tfn, "%s.new", fn);
+  int fd = ::open(tfn, O_WRONLY|O_CREAT);
+  assert(fd);
+  
+  // write size
+  __int32_t len = bl.length();
+  ::write(fd, &len, sizeof(len));
+
+  // write data
+  for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+       it != bl.buffers().end();
+       it++) 
+    ::write(fd, it->c_str(), it->length());
+
+  ::fchmod(fd, 0644);
+  ::fsync(fd);
+  ::close(fd);
+  ::rename(tfn, fn);
+
+  return 0;
+}
diff --git a/trunk/ceph/mon/MonitorStore.h b/trunk/ceph/mon/MonitorStore.h
new file mode 100644 (file)
index 0000000..f1d5f67
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MON_MONITORSTORE_H
+#define __MON_MONITORSTORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+
+#include <string.h>
+
+class MonitorStore {
+  string dir;
+
+public:
+  MonitorStore(char *d) : dir(d) {
+  }
+  ~MonitorStore() {
+  }
+
+  void mkfs();  // wipe
+  void mount();
+
+  // ints (stored as ascii)
+  version_t get_int(const char *a, const char *b=0);
+  void put_int(version_t v, const char *a, const char *b=0);
+
+  // buffers
+  // ss and sn varieties.
+  bool exists_bl_ss(const char *a, const char *b=0);
+  int get_bl_ss(bufferlist& bl, const char *a, const char *b);
+  int put_bl_ss(bufferlist& bl, const char *a, const char *b);
+  bool exists_bl_sn(const char *a, version_t b) {
+    char bs[20];
+    sprintf(bs, "%llu", b);
+    return exists_bl_ss(a, bs);
+  }
+  int get_bl_sn(bufferlist& bl, const char *a, version_t b) {
+    char bs[20];
+    sprintf(bs, "%llu", b);
+    return get_bl_ss(bl, a, bs);
+  }
+  int put_bl_sn(bufferlist& bl, const char *a, version_t b) {
+    char bs[20];
+    sprintf(bs, "%llu", b);
+    return put_bl_ss(bl, a, bs);
+  }
+
+  /*
+  version_t get_incarnation() { return get_int("incarnation"); }
+  void set_incarnation(version_t i) { set_int(i, "incarnation"); }
+  
+  version_t get_last_proposal() { return get_int("last_proposal"); }
+  void set_last_proposal(version_t i) { set_int(i, "last_proposal"); }
+  */
+};
+
+
+#endif
index 7a19fb910a67d8ee18a8d7f07627555e0a8a4f2e..43ec4eddf2eca8fbde1b6e53bce523415509c757 100644 (file)
@@ -15,7 +15,7 @@
 #include "Monitor.h"
 #include "MDSMonitor.h"
 
-#include "osd/ObjectStore.h"
+#include "MonitorStore.h"
 
 #include "messages/MOSDFailure.h"
 #include "messages/MOSDMap.h"
@@ -91,6 +91,7 @@ void OSDMonitor::fake_reorg()
 
 
 
+/*
 void OSDMonitor::init()
 {
   // start with blank map
@@ -106,6 +107,7 @@ void OSDMonitor::init()
     pending_inc.epoch = osdmap.get_epoch()+1;
   }
 }
+*/
 
 
 
@@ -224,20 +226,18 @@ void OSDMonitor::create_initial()
 
 bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl)
 {
-  object_t oid(Monitor::INO_OSD_MAP, epoch);
-  if (!mon->store->exists(oid))
+  if (!mon->store->exists_bl_sn("osdmap", epoch))
     return false;
-  int r = mon->store->read(oid, 0, 0, bl);
+  int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
   assert(r > 0);
   return true;  
 }
 
 bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl)
 {
-  object_t oid(Monitor::INO_OSD_INC_MAP, epoch);
-  if (!mon->store->exists(oid))
+  if (!mon->store->exists_bl_sn("osdincmap", epoch))
     return false;
-  int r = mon->store->read(oid, 0, 0, bl);
+  int r = mon->store->get_bl_sn(bl, "osdincmap", epoch);
   assert(r > 0);
   return true;  
 }
@@ -248,11 +248,8 @@ void OSDMonitor::save_map()
   bufferlist bl;
   osdmap.encode(bl);
 
-  ObjectStore::Transaction t;
-  t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
-  t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl); 
-  mon->store->apply_transaction(t);
-  mon->store->sync();
+  mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
+  mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
 }
 
 void OSDMonitor::save_inc_map(OSDMap::Incremental &inc)
@@ -263,12 +260,9 @@ void OSDMonitor::save_inc_map(OSDMap::Incremental &inc)
   bufferlist incbl;
   inc.encode(incbl);
 
-  ObjectStore::Transaction t;
-  t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
-  t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl);         // not strictly needed??
-  t.write(object_t(Monitor::INO_OSD_INC_MAP,osdmap.get_epoch()), 0, incbl.length(), incbl); 
-  mon->store->apply_transaction(t);
-  mon->store->sync();
+  mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
+  mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch());
+  mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
 }
 
 
@@ -364,6 +358,21 @@ void OSDMonitor::fake_osd_failure(int osd, bool down)
   bcast_latest_mds();
 }
 
+void OSDMonitor::mark_all_down()
+{
+  dout(7) << "mark_all_down" << endl;
+
+  for (set<int>::iterator it = osdmap.get_osds().begin();
+       it != osdmap.get_osds().end();
+       it++) {
+    if (osdmap.is_down(*it)) continue;
+    pending_inc.new_down[*it] = osdmap.get_inst(*it);
+  }
+  accept_pending();
+}
+
+
+
 
 void OSDMonitor::handle_osd_boot(MOSDBoot *m)
 {
@@ -622,6 +631,26 @@ void OSDMonitor::election_finished()
 {
   dout(10) << "election_finished" << endl;
 
+  if (mon->is_leader()) {
+    if (g_conf.mkfs) {
+      create_initial();
+      save_map();
+    } else {
+      //
+      epoch_t epoch = mon->store->get_int("osd_epoch");
+      dout(10) << " last epoch was " << epoch << endl;
+      bufferlist bl, blinc;
+      int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
+      assert(r>0);
+      osdmap.decode(bl);
+
+      // pending_inc
+      pending_inc.epoch = epoch+1;
+    }
+
+  }
+
+  /*
   state = STATE_INIT;
 
   // map?
@@ -644,7 +673,7 @@ void OSDMonitor::election_finished()
     //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
     //                     mon->monmap->get_inst(mon->leader));
   }
-  
+  */
 }
 
 
index d2e6b2e284118d3b1bdaab9809e78f11ad5c4a1b..bf393f17d9f7aee6f7e6e7e524ce4c2a5729adc9 100644 (file)
@@ -60,7 +60,7 @@ private:
   int state;
   utime_t lease_expire;     // when lease expires
   
-  void init();
+  //void init();
 
   // maps
   void accept_pending();   // accept pending, new map.
@@ -89,7 +89,7 @@ private:
   OSDMonitor(Monitor *mn, Messenger *m, Mutex& l) : 
     mon(mn), messenger(m), lock(l),
     state(STATE_SYNC) {
-    init();
+    //init();
   }
 
   void dispatch(Message *m);
@@ -100,6 +100,8 @@ private:
 
   void issue_leases();
 
+  void mark_all_down();
+
   void fake_osd_failure(int osd, bool down);
   void fake_osdmap_update();
   void fake_reorg();
diff --git a/trunk/ceph/mon/Paxos.cc b/trunk/ceph/mon/Paxos.cc
new file mode 100644 (file)
index 0000000..67c4e2e
--- /dev/null
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "Paxos.h"
+#include "Monitor.h"
+#include "MonitorStore.h"
+
+#include "messages/MMonPaxos.h"
+
+#include "config.h"
+#undef dout
+#define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+#define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+
+
+// ---------------------------------
+// proposer
+void Paxos::propose(version_t v, bufferlist& value)
+{
+//todo high rf
+}
+  
+void Paxos::handle_last(MMonPaxos *m)
+{
+//todo high rf
+  dout(10) << "handle_last " << *m << endl;
+  delete m;
+}
+
+void Paxos::handle_accept(MMonPaxos *m)
+{
+//todo high rf
+  dout(10) << "handle_accept " << *m << endl;
+  delete m;
+  
+}
+
+void Paxos::handle_ack(MMonPaxos *m)
+{
+//todo high rf
+  dout(10) << "handle_ack " << *m << endl;
+  delete m;
+}
+
+void Paxos::handle_old_round(MMonPaxos *m)
+{
+//todo high rf
+  dout(10) << "handle_old_round " << *m << endl;
+  delete m;
+}
+  
+
+/*
+ * return a globally unique, monotonically increasing proposal number
+ */
+version_t Paxos::get_new_proposal_number(version_t gt)
+{
+  // read last
+  version_t last = mon->store->get_int("last_paxos_proposal");
+  if (last < gt) 
+    last = gt;
+  
+  // update
+  last /= 100;
+  last++;
+
+  // make it unique among all monitors.
+  version_t pn = last*100 + (version_t)whoami;
+  
+  // write
+  mon->store->put_int(pn, "last_paxos_proposal");
+
+  dout(10) << "get_new_proposal_number = " << pn << endl;
+  return pn;
+}
+
+
+// ---------------------------------
+// accepter
+void Paxos::handle_collect(MMonPaxos *m)
+{
+//todo high rf
+  // ...
+
+  delete m;
+}
+
+
+
+
+// ---------------------------------
+// learner
+void Paxos::handle_success(MMonPaxos *m)
+{
+  //todo high rf
+  delete m;
+}
+
+void Paxos::handle_begin(MMonPaxos *m)
+{
+  //todo high rf
+  delete m;
+}
+
+// ---------------------------------
+
+void Paxos::leader_start()
+{
+  dout(10) << "i am the leader" << endl;
+
+  // .. do something else too 
+  version_t pn = get_new_proposal_number();
+  for (int i=0; i<mon->monmap->num_mon; ++i) {
+    if (i == whoami) continue;
+    // todo high rf I pass the pn twice... what is the last parameter for?
+    mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COLLECT, whoami, pn, pn),
+                                mon->monmap->get_inst(i));
+  }
+}
+
+
+
+void Paxos::dispatch(Message *m)
+{
+  switch (m->get_type()) {
+       
+  case MSG_MON_PAXOS:
+    {
+      MMonPaxos *pm = (MMonPaxos*)m;
+      
+      // NOTE: these ops are defined in messages/MMonPaxos.h
+      switch (pm->op) {
+       // learner
+      case MMonPaxos::OP_COLLECT:
+       handle_collect(pm);
+       break;
+       
+      case MMonPaxos::OP_LAST:
+       handle_last(pm);
+       break;
+       
+      case MMonPaxos::OP_OLDROUND:
+       handle_old_round(pm);
+       break;
+       
+      case MMonPaxos::OP_BEGIN:
+       handle_begin(pm);
+       break;
+       
+      case MMonPaxos::OP_ACCEPT:
+       handle_accept(pm);
+       break;          
+       
+      case MMonPaxos::OP_SUCCESS:
+       handle_success(pm);
+       break;
+       
+      case MMonPaxos::OP_ACK:
+       handle_ack(pm);
+       break;
+       
+         default:
+           assert(0);
+      }
+    }
+    break;
+    
+  default:
+    assert(0);
+  }
+}
+
diff --git a/trunk/ceph/mon/Paxos.h b/trunk/ceph/mon/Paxos.h
new file mode 100644 (file)
index 0000000..52a509d
--- /dev/null
@@ -0,0 +1,73 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MON_PAXOS_H
+#define __MON_PAXOS_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include "msg/Message.h"
+
+#include "include/Context.h"
+
+#include "common/Timer.h"
+
+class Monitor;
+class MMonPaxos;
+
+// i am one state machine.
+class Paxos {
+  Monitor *mon;
+  int whoami;
+
+  // my state machine info
+  int machine_id;
+  const char *machine_name;
+  map<version_t, bufferlist> accepted_values;
+  map<version_t, int>        accepted_proposal_number;
+
+  // proposer
+  void propose(version_t v, bufferlist& value);
+  
+  void handle_last(MMonPaxos*);
+  void handle_accept(MMonPaxos*);
+  void handle_ack(MMonPaxos*);
+  void handle_old_round(MMonPaxos*);
+  
+  version_t get_new_proposal_number(version_t gt=0);
+  
+  // accepter
+  void handle_collect(MMonPaxos*);
+
+  // learner
+  void handle_success(MMonPaxos*);
+  void handle_begin(MMonPaxos*);
+  
+
+public:
+  Paxos(Monitor *m, int w,
+       int mid,const char *mnm) : mon(m), whoami(w), 
+                                  machine_id(mid), machine_name(mnm) {
+  }
+
+  void dispatch(Message *m);
+
+  void leader_start();
+
+};
+
+
+
+#endif
+
index 97e59bbb7819d6f66630ef79290a0cdf0d73fb4d..8b6fe923814276671561fcb3d017b20f1d4129f1 100644 (file)
@@ -27,7 +27,7 @@ class Dispatcher {
   virtual void dispatch(Message *m) = 0;
 
   // how i deal with transmission failures.
-  virtual void ms_handle_failure(Message *m, entity_name_t dest, const entity_addr_t& addr) { delete m; }
+  virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { delete m; }
 };
 
 #endif
index 26a9eff1d11713a124d1c72127e71b1e52242ed2..d2db8c8f7e11c55c063570169ac767a7b0c7c909 100644 (file)
@@ -217,7 +217,9 @@ FakeMessenger::FakeMessenger(entity_name_t me)  : Messenger(me)
   {
     // assign rank
     _myinst.name = me;
-    _myinst.addr.nonce = nranks++;
+    _myinst.addr.port = nranks++;
+    //if (!me.is_mon())
+    //_myinst.addr.nonce = getpid();
 
     // add to directory
     directory[ _myinst.addr ] = this;
@@ -323,7 +325,7 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr
           ++p) {
        dout(1) << "** have " << p->first << " to " << p->second << endl;
       }
-      assert(dm);
+      //assert(dm);
     }
     dm->queue_incoming(m);
 
index 0e2381a1823bebff5ff6ed4c73f6f50c95228199..ae01d9106ddafefc98ba5df6b8077fddb6a87883 100644 (file)
@@ -19,6 +19,8 @@ using namespace std;
 #include "messages/MNSFailure.h"
 */
 
+#include "messages/MMonPaxos.h"
+
 #include "messages/MMonElectionAck.h"
 #include "messages/MMonElectionPropose.h"
 #include "messages/MMonElectionVictory.h"
@@ -156,6 +158,10 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
     break;
        */
 
+  case MSG_MON_PAXOS:
+    m = new MMonPaxos;
+    break;
+
   case MSG_MON_ELECTION_PROPOSE:
     m = new MMonElectionPropose;
     break;
index bb5cd1bc6dd8d9656daed77a6943f476be25a789..80e1b9feaac28064192fcdb4efb6b9ec4cefaa87 100644 (file)
@@ -36,6 +36,7 @@
 #define MSG_SHUTDOWN    99999
 
 
+
 #define MSG_MON_ELECTION_ACK       15
 #define MSG_MON_ELECTION_PROPOSE   16
 #define MSG_MON_ELECTION_VICTORY   17
@@ -47,6 +48,8 @@
 #define MSG_MON_OSDMAP_UPDATE_ACK      24
 #define MSG_MON_OSDMAP_UPDATE_COMMIT   25
 
+#define MSG_MON_PAXOS              30
+
 #define MSG_OSD_OP           40    // delete, etc.
 #define MSG_OSD_OPREPLY      41    // delete, etc.
 #define MSG_OSD_PING         42
@@ -252,11 +255,15 @@ public:
 
   // source/dest
   entity_inst_t& get_dest_inst() { return env.dst; }
+  void set_dest_inst(entity_inst_t& inst) { env.dst = inst; }
+
   entity_inst_t& get_source_inst() { return env.src; }
+  void set_source_inst(entity_inst_t& inst) { env.src = inst; }
 
   entity_name_t& get_dest() { return env.dst.name; }
   void set_dest(entity_name_t a, int p) { env.dst.name = a; env.dest_port = p; }
   int get_dest_port() { return env.dest_port; }
+  void set_dest_port(int p) { env.dest_port = p; }
   
   entity_name_t& get_source() { return env.src.name; }
   void set_source(entity_name_t a, int p) { env.src.name = a; env.source_port = p; }
index 524a65fe76ce512f2daef511bcac8d46d3378b32..ec19e41bb45818b707e9470dc230773d0d57541b 100644 (file)
@@ -55,7 +55,7 @@ void Rank::sigint()
   lock.Lock();
   derr(0) << "got control-c, exiting" << endl;
   ::close(accepter.listen_sd);
-  exit(-1);
+  _exit(-1);
   lock.Unlock();
 }
 
@@ -84,13 +84,11 @@ int Rank::Accepter::start()
   socklen_t llen = sizeof(rank.listen_addr);
   getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
   
-  int myport = ntohs(rank.listen_addr.sin_port);
-  dout(10) << "accepter.start bound to port " << myport << endl;
+  dout(10) << "accepter.start bound to " << rank.listen_addr << endl;
 
   // listen!
   rc = ::listen(listen_sd, 1000);
   assert(rc >= 0);
-  //dout(10) << "accepter.start listening on " << myport << endl;
   
   // my address is...  HELP HELP HELP!
   char host[100];
@@ -102,24 +100,20 @@ int Rank::Accepter::start()
 
   // figure out my_addr
   if (g_my_addr.port > 0) {
-    // user specified it, easy.
+    // user specified it, easy peasy.
     rank.my_addr = g_my_addr;
   } else {
-    // try to figure out what ip i can be reached out
-    memset(&rank.listen_addr, 0, sizeof(rank.listen_addr));
-
     // look up my hostname.  blech!  this sucks.
     rank.listen_addr.sin_family = myhostname->h_addrtype;
     memcpy((char *) &rank.listen_addr.sin_addr.s_addr, 
           myhostname->h_addr_list[0], 
           myhostname->h_length);
-    rank.listen_addr.sin_port = htons(myport);
+
+    // set up my_addr with a nonce
     rank.my_addr.set_addr(rank.listen_addr);
+    rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
   }
-  
-  // set a nonce
-  rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
-  
+
   dout(10) << "accepter.start my addr is " << rank.my_addr << endl;
 
   // set up signal handler
@@ -688,8 +682,8 @@ void Rank::Pipe::fail(list<Message*>& out)
       for (list<Message*>::iterator k = j->second.begin();
            k != j->second.end();
            ++k) {
-       derr(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << **k << " to " << j->first << " inst " << peer_addr << endl;
-        i->first->ms_handle_failure(*k, j->first, peer_addr);
+       derr(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << **k << " to " << (*k)->get_dest_inst() << endl;
+        i->first->ms_handle_failure(*k, (*k)->get_dest_inst());
       }
 }
 
@@ -1117,7 +1111,8 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest,
   // set envelope
   m->set_source(get_myname(), fromport);
   m->set_source_addr(rank.my_addr);
-  m->set_dest(dest.name, port);
+  m->set_dest_inst(dest);
+  m->set_dest_port(port);
  
   dout(1) << m->get_source()
           << " --> " << dest.name << " " << dest.addr
index 5fd8d780b291eda0ac2d6329b94af0e737f3a4ff..0b92df47020d0d0b7c26afc3b9b3afd75c74c9d7 100644 (file)
@@ -109,12 +109,13 @@ struct entity_addr_t {
 
   void set_addr(tcpaddr_t a) {
     memcpy((char*)ipq, (char*)&a.sin_addr.s_addr, 4);
-    port = a.sin_port;
+    port = ntohs(a.sin_port);
   }
   void make_addr(tcpaddr_t& a) const {
+    memset(&a, 0, sizeof(a));
     a.sin_family = AF_INET;
     memcpy((char*)&a.sin_addr.s_addr, (char*)ipq, 4);
-    a.sin_port = port;
+    a.sin_port = htons(port);
   }
 };
 
index 36dc01127107e3fa87067c6872f3003103a44c0a..1ff08530e4cfd0aeb5f8f180ad782ff92ec24976 100644 (file)
@@ -28,7 +28,7 @@
 #include <cassert>
 #include <errno.h>
 #include <dirent.h>
-//#include <sys/xattr.h>
+#include <sys/xattr.h>
 //#include <sys/vfs.h>
 
 #ifdef DARWIN
@@ -38,7 +38,8 @@
 
 #include "config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
+#define  dout(l)    if (l<=g_conf.debug) cout << g_clock.now() << " osd" << whoami << ".fakestore "
+#define  derr(l)    if (l<=g_conf.debug) cerr << g_clock.now() << " osd" << whoami << ".fakestore "
 
 #include "include/buffer.h"
 
@@ -54,159 +55,149 @@ using namespace __gnu_cxx;
 
 
 
+int FakeStore::statfs(struct statfs *buf)
+{
+  return ::statfs(basedir.c_str(), buf);
+}
 
 
+/* 
+ * sorry, these are sentitive to the object_t and coll_t typing.
+ */ 
+void FakeStore::get_oname(object_t oid, char *s) 
+{
+  static hash<object_t> H;
+  assert(sizeof(oid) == 16);
+  sprintf(s, "%s/objects/%02x/%016llx.%016llx", basedir.c_str(), H(oid) & HASH_MASK, 
+         *((__uint64_t*)&oid),
+         *(((__uint64_t*)&oid) + 1));
+}
 
-int FakeStore::mount() 
+void FakeStore::get_cdir(coll_t cid, char *s) 
+{
+  assert(sizeof(cid) == 8);
+  sprintf(s, "%s/collections/%016llx", basedir.c_str(), 
+         cid);
+}
+
+void FakeStore::get_coname(coll_t cid, object_t oid, char *s) 
+{
+  assert(sizeof(oid) == 16);
+  sprintf(s, "%s/collections/%016llx/%016llx.%016llx", basedir.c_str(), cid, 
+         *((__uint64_t*)&oid),
+         *(((__uint64_t*)&oid) + 1));
+}
+
+
+
+
+int FakeStore::mkfs()
 {
+  char cmd[200];
   if (g_conf.fakestore_dev) {
     dout(0) << "mounting" << endl;
-    char cmd[100];
     sprintf(cmd,"mount %s", g_conf.fakestore_dev);
     system(cmd);
   }
 
-  string mydir;
-  get_dir(mydir);
-
-  dout(5) << "init with basedir " << mydir << endl;
-
-  // make sure global base dir exists
-  struct stat st;
-  int r = ::stat(basedir.c_str(), &st);
-  if (r != 0) {
-    dout(1) << "unable to stat basedir " << basedir << ", r = " << r << endl;
-    return r;
-  }
+  dout(1) << "mkfs in " << basedir << endl;
 
-  // all okay.
-  return 0;
-}
+  // wipe
+  sprintf(cmd, "test -d %s && rm -r %s ; mkdir -p %s/collections && mkdir -p %s/objects",
+         basedir.c_str(), basedir.c_str(), basedir.c_str(), basedir.c_str());
+  
+  dout(5) << "wipe: " << cmd << endl;
+  system(cmd);
 
-int FakeStore::umount() 
-{
-  dout(5) << "finalize" << endl;
+  // hashed bits too
+  for (int i=0; i<HASH_DIRS; i++) {
+    char s[4];
+    sprintf(s, "%02x", i);
+    string subdir = basedir + "/objects/" + s;
 
+    dout(15) << " creating " << subdir << endl;
+    int r = ::mkdir(subdir.c_str(), 0755);
+    if (r != 0) {
+      derr(0) << "couldnt create subdir, r = " << r << endl;
+      return r;
+    }
+  }
+  
   if (g_conf.fakestore_dev) {
     char cmd[100];
     dout(0) << "umounting" << endl;
     sprintf(cmd,"umount %s", g_conf.fakestore_dev);
-    system(cmd);
+    //system(cmd);
   }
 
-  // nothing
-  return 0;
-}
-
-
-int FakeStore::statfs(struct statfs *buf)
-{
-  string mydir;
-  get_dir(mydir);
-  return ::statfs(mydir.c_str(), buf);
-}
-
-
-
+  dout(1) << "mkfs done in " << basedir << endl;
 
-void FakeStore::get_dir(string& dir) {
-  char s[30];
-  sprintf(s, "%d", whoami);
-  dir = basedir + "/" + s;
-}
-void FakeStore::get_oname(object_t oid, string& fn) {
-  char s[100];
-  static hash<object_t> H;
-  sprintf(s, "%d/%02x/%016llx.%08x.%d", whoami, H(oid) & HASH_MASK, oid.ino, oid.bno, oid.rev);
-  fn = basedir + "/" + s;
-  //  dout(1) << "oname is " << fn << endl;
-}
-
-
-
-void FakeStore::wipe_dir(string mydir)
-{
-  DIR *dir = ::opendir(mydir.c_str());
-  if (dir) {
-    dout(10) << "wiping " << mydir << endl;
-    struct dirent *ent = 0;
-    
-    while ((ent = ::readdir(dir)) != 0) {
-      if (ent->d_name[0] == '.') continue;
-      dout(25) << "mkfs unlinking " << ent->d_name << endl;
-      string fn = mydir + "/" + ent->d_name;
-      ::unlink(fn.c_str());
-    }    
-    
-    ::closedir(dir);
-  } else {
-    dout(1) << "mkfs couldn't read dir " << mydir << endl;
-  }
+  return 0;
 }
 
-int FakeStore::mkfs()
+int FakeStore::mount() 
 {
   if (g_conf.fakestore_dev) {
     dout(0) << "mounting" << endl;
     char cmd[100];
     sprintf(cmd,"mount %s", g_conf.fakestore_dev);
-    system(cmd);
+    //system(cmd);
   }
 
-
-  int r = 0;
+  dout(5) << "basedir " << basedir << endl;
+  
+  // make sure global base dir exists
   struct stat st;
-  string mydir;
-  get_dir(mydir);
-
-  dout(1) << "mkfs in " << mydir << endl;
-
-
-  // make sure my dir exists
-  r = ::stat(mydir.c_str(), &st);
+  int r = ::stat(basedir.c_str(), &st);
   if (r != 0) {
-    dout(10) << "creating " << mydir << endl;
-    mkdir(mydir.c_str(), 0755);
-    r = ::stat(mydir.c_str(), &st);
-    if (r != 0) {
-      dout(1) << "couldnt create dir, r = " << r << endl;
-      return r;
-    }
+    derr(0) << "unable to stat basedir " << basedir << ", r = " << r << endl;
+    return r;
+  }
+  
+  if (g_conf.fakestore_fake_collections) {
+    dout(0) << "faking collections (in memory)" << endl;
+    fake_collections = true;
   }
-  else wipe_dir(mydir);
 
-  // hashed bits too
-  for (int i=0; i<HASH_DIRS; i++) {
-    char s[4];
-    sprintf(s, "%02x", i);
-    string subdir = mydir + "/" + s;
-    r = ::stat(subdir.c_str(), &st);
-    if (r != 0) {
-      dout(2) << " creating " << subdir << endl;
-      ::mkdir(subdir.c_str(), 0755);
-      r = ::stat(subdir.c_str(), &st);
-      if (r != 0) {
-        dout(1) << "couldnt create subdir, r = " << r << endl;
-        return r;
-      }
+  // fake attrs? 
+  // let's test to see if they work.
+  if (g_conf.fakestore_fake_attrs) {
+    dout(0) << "faking attrs (in memory)" << endl;
+    fake_attrs = true;
+  } else {
+    char names[1000];
+    r = ::listxattr(basedir.c_str(), names, 1000);
+    if (r < 0) {
+      derr(0) << "xattrs don't appear to work (" << strerror(errno) << "), specify --fakestore_fake_attrs to fake them (in memory)." << endl;
+      assert(0);
     }
-    else
-      wipe_dir( subdir );
   }
+
+  // all okay.
+  return 0;
+}
+
+int FakeStore::umount() 
+{
+  dout(5) << "umount " << basedir << endl;
   
+  sync();
+
   if (g_conf.fakestore_dev) {
     char cmd[100];
     dout(0) << "umounting" << endl;
     sprintf(cmd,"umount %s", g_conf.fakestore_dev);
-    system(cmd);
+    //system(cmd);
   }
 
-  dout(1) << "mkfs done in " << mydir << endl;
-
-  return r;
+  // nothing
+  return 0;
 }
 
 
+// --------------------
+// objects
+
 
 bool FakeStore::exists(object_t oid)
 {
@@ -222,9 +213,9 @@ int FakeStore::stat(object_t oid,
                     struct stat *st)
 {
   dout(20) << "stat " << oid << endl;
-  string fn;
+  char fn[200];
   get_oname(oid,fn);
-  int r = ::stat(fn.c_str(), st);
+  int r = ::stat(fn, st);
   return r;
 }
  
@@ -233,9 +224,9 @@ int FakeStore::stat(object_t oid,
 int FakeStore::remove(object_t oid, Context *onsafe) 
 {
   dout(20) << "remove " << oid << endl;
-  string fn;
+  char fn[200];
   get_oname(oid,fn);
-  int r = ::unlink(fn.c_str());
+  int r = ::unlink(fn);
   if (onsafe) sync(onsafe);
   return r;
 }
@@ -244,9 +235,9 @@ int FakeStore::truncate(object_t oid, off_t size, Context *onsafe)
 {
   dout(20) << "truncate " << oid << " size " << size << endl;
 
-  string fn;
+  char fn[200];
   get_oname(oid,fn);
-  int r = ::truncate(fn.c_str(), size);
+  int r = ::truncate(fn, size);
   if (onsafe) sync(onsafe);
   return r;
 }
@@ -256,12 +247,12 @@ int FakeStore::read(object_t oid,
                     bufferlist& bl) {
   dout(20) << "read " << oid << " len " << len << " off " << offset << endl;
 
-  string fn;
+  char fn[200];
   get_oname(oid,fn);
   
-  int fd = ::open(fn.c_str(), O_RDONLY);
+  int fd = ::open(fn, O_RDONLY);
   if (fd < 0) {
-    dout(10) << "read couldn't open " << fn.c_str() << " errno " << errno << " " << strerror(errno) << endl;
+    dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror(errno) << endl;
     return fd;
   }
   ::flock(fd, LOCK_EX);    // lock for safety
@@ -271,7 +262,7 @@ int FakeStore::read(object_t oid,
 
   if (len == 0) {
     struct stat st;
-    fstat(fd, &st);
+    ::fstat(fd, &st);
     len = st.st_size;
   }
 
@@ -294,22 +285,22 @@ int FakeStore::write(object_t oid,
 {
   dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
 
-  string fn;
+  char fn[200];
   get_oname(oid,fn);
   
-  ::mknod(fn.c_str(), 0644, 0);  // in case it doesn't exist yet.
+  ::mknod(fn, 0644, 0);  // in case it doesn't exist yet.
 
   int flags = O_WRONLY;//|O_CREAT;
-  int fd = ::open(fn.c_str(), flags);
+  int fd = ::open(fn, flags);
   if (fd < 0) {
-    dout(1) << "write couldn't open " << fn.c_str() << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
+    derr(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
     return fd;
   }
+  ::fchmod(fd, 0664);
   ::flock(fd, LOCK_EX);    // lock for safety
-  //::fchmod(fd, 0664);
   
   // seek
-  off_t actual = lseek(fd, offset, SEEK_SET);
+  off_t actual = ::lseek(fd, offset, SEEK_SET);
   int did = 0;
   assert(actual == offset);
 
@@ -321,12 +312,12 @@ int FakeStore::write(object_t oid,
     if (r > 0)
       did += r;
     else {
-      dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+      derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
     }
   }
   
   if (did < 0) {
-    dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+    derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
   }
 
   ::flock(fd, LOCK_UN);
@@ -341,24 +332,44 @@ int FakeStore::write(object_t oid,
 
 
 class C_FakeSync : public Context {
-public:
   Context *c;
   int *n;
-  C_FakeSync(Context *c_, int *n_) : c(c_), n(n_) {
+  Mutex *lock;
+  Cond *cond;
+
+public:
+  C_FakeSync(Context *c_, int *n_, Mutex *lo, Cond *co) : 
+    c(c_), n(n_),
+    lock(lo), cond(co) {
+    lock->Lock();
     ++*n;
+    lock->Unlock();
   }
   void finish(int r) {
     c->finish(r);
+    
+    lock->Lock();
     --(*n);
-    //cout << "sync, " << *n << " still unsync" << endl;
+    if (*n == 0) cond->Signal();
+    lock->Unlock();
   }
 };
 
+void FakeStore::sync()
+{
+  synclock.Lock();
+  while (unsync > 0) {
+    dout(0) << "sync waiting for " << unsync << " items to (fake) sync" << endl;
+    synccond.Wait(synclock);
+  }
+  synclock.Unlock();
+}
+
 void FakeStore::sync(Context *onsafe)
 {
   if (g_conf.fakestore_fake_sync) {
     g_timer.add_event_after((float)g_conf.fakestore_fake_sync,
-                            new C_FakeSync(onsafe, &unsync));
+                            new C_FakeSync(onsafe, &unsync, &synclock, &synccond));
     
   } else {
     assert(0); // der..no implemented anymore
@@ -366,4 +377,250 @@ void FakeStore::sync(Context *onsafe)
 }
 
 
+// -------------------------------
+// attributes
+
+// objects
+
+int FakeStore::setattr(object_t oid, const char *name,
+                      const void *value, size_t size,
+                      Context *onsafe) 
+{
+  if (fake_attrs) return attrs.setattr(oid, name, value, size, onsafe);
+
+  char fn[100];
+  get_oname(oid, fn);
+  int r = ::setxattr(fn, name, value, size, 0);
+  return r;
+}
+
+int FakeStore::setattrs(object_t oid, map<string,bufferptr>& aset) 
+{
+  if (fake_attrs) return attrs.setattrs(oid, aset);
+
+  char fn[100];
+  get_oname(oid, fn);
+  int r = 0;
+  for (map<string,bufferptr>::iterator p = aset.begin();
+       p != aset.end();
+       ++p) {
+    r = ::setxattr(fn, p->first.c_str(), p->second.c_str(), p->second.length(), 0);
+    if (r < 0) break;
+  }
+  return r;
+}
+
+int FakeStore::getattr(object_t oid, const char *name,
+                      void *value, size_t size) 
+{
+  if (fake_attrs) return attrs.getattr(oid, name, value, size);
+  char fn[100];
+  get_oname(oid, fn);
+  int r = ::getxattr(fn, name, value, size);
+  return r;
+}
+
+int FakeStore::getattrs(object_t oid, map<string,bufferptr>& aset) 
+{
+  if (fake_attrs) return attrs.getattrs(oid, aset);
+
+  char fn[100];
+  get_oname(oid, fn);
+
+  char val[1000];
+  char names[1000];
+  int num = ::listxattr(fn, names, 1000);
+  
+  char *name = names;
+  for (int i=0; i<num; i++) {
+    dout(0) << "getattrs " << oid << " getting " << (i+1) << "/" << num << " '" << names << "'" << endl;
+    int l = ::getxattr(fn, name, val, 1000);
+    dout(0) << "getattrs " << oid << " getting " << (i+1) << "/" << num << " '" << names << "' = " << l << " bytes" << endl;
+    aset[names].append(val, l);
+    name += strlen(name) + 1;
+  }
+  
+  return 0;
+}
+
+int FakeStore::rmattr(object_t oid, const char *name, Context *onsafe) 
+{
+  if (fake_attrs) return attrs.rmattr(oid, name, onsafe);
+  char fn[100];
+  get_oname(oid, fn);
+  int r = ::removexattr(fn, name);
+  return r;
+}
+
+/*
+int FakeStore::listattr(object_t oid, char *attrls, size_t size) 
+{
+  if (fake_attrs) return attrs.listattr(oid, attrls, size);
+  char fn[100];
+  get_oname(oid, fn);
+  return ::listxattr(fn, attrls, size);
+}
+*/
+
+
+// collections
+
+int FakeStore::collection_setattr(coll_t c, const char *name,
+                                 void *value, size_t size,
+                                 Context *onsafe) 
+{
+  if (fake_attrs) return attrs.collection_setattr(c, name, value, size, onsafe);
+  return 0;
+}
+
+int FakeStore::collection_rmattr(coll_t c, const char *name,
+                                Context *onsafe) 
+{
+  if (fake_attrs) return attrs.collection_rmattr(c, name, onsafe);
+  return 0;
+}
+
+int FakeStore::collection_getattr(coll_t c, const char *name,
+                                 void *value, size_t size) 
+{
+  if (fake_attrs) return attrs.collection_getattr(c, name, value, size);
+  return 0;
+}
+
+/*
+int FakeStore::collection_listattr(coll_t c, char *attrs, size_t size) 
+{
+  if (fake_attrs) return collection_listattr(c, attrs, size);
+  return 0;
+}
+*/
+
+// --------------------------
+// collections
+
+int FakeStore::list_collections(list<coll_t>& ls) 
+{
+  if (fake_collections) return collections.list_collections(ls);
+
+  char fn[200];
+  sprintf(fn, "%s/collections", basedir.c_str());
+
+  DIR *dir = ::opendir(fn);
+  assert(dir);
+
+  struct dirent *de;
+  while ((de = ::readdir(dir)) != 0) {
+    // parse
+    coll_t c = strtoll(de->d_name, 0, 16);
+    dout(0) << " got " << c << " errno " << errno << " on " << de->d_name << endl;
+    if (errno) continue;
+    ls.push_back(c);
+  }
+  
+  ::closedir(dir);
+  return 0;
+}
+
+int FakeStore::create_collection(coll_t c,
+                                Context *onsafe) 
+{
+  if (fake_collections) return collections.create_collection(c, onsafe);
+  
+  char fn[200];
+  get_cdir(c, fn);
+
+  int r = ::mkdir(fn, 0755);
+
+  if (onsafe) sync(onsafe);
+  return r;
+}
+
+int FakeStore::destroy_collection(coll_t c,
+                                 Context *onsafe) 
+{
+  if (fake_collections) return collections.destroy_collection(c, onsafe);
+
+  char fn[200];
+  get_cdir(c, fn);
+  char cmd[200];
+  sprintf(cmd, "test -d %s && rm -r %s", fn, fn);
+  system(cmd);
+
+  if (onsafe) sync(onsafe);
+  return 0;
+}
+
+int FakeStore::collection_stat(coll_t c, struct stat *st) 
+{
+  if (fake_collections) return collections.collection_stat(c, st);
+
+  char fn[200];
+  get_cdir(c, fn);
+  return ::lstat(fn, st);
+}
+
+bool FakeStore::collection_exists(coll_t c) 
+{
+  if (fake_collections) return collections.collection_exists(c);
+
+  struct stat st;
+  return collection_stat(c, &st) == 0;
+}
+
+
+int FakeStore::collection_add(coll_t c, object_t o,
+                             Context *onsafe) 
+{
+  if (fake_collections) return collections.collection_add(c, o, onsafe);
+
+  char cof[200];
+  get_coname(c, o, cof);
+  char of[200];
+  get_oname(o, of);
+  
+  int r = ::link(of, cof);
+  if (onsafe) sync(onsafe);
+  return r;
+}
+
+int FakeStore::collection_remove(coll_t c, object_t o,
+                                Context *onsafe) 
+{
+  if (fake_collections) return collections.collection_remove(c, o, onsafe);
+
+  char cof[200];
+  get_coname(c, o, cof);
+
+  int r = ::unlink(cof);
+  if (onsafe) sync(onsafe);
+  return r;
+}
+
+int FakeStore::collection_list(coll_t c, list<object_t>& ls) 
+{  
+  if (fake_collections) return collections.collection_list(c, ls);
+
+  char fn[200];
+  get_cdir(c, fn);
+
+  DIR *dir = ::opendir(fn);
+  assert(dir);
+  
+  struct dirent *de;
+  while ((de = ::readdir(dir)) != 0) {
+    // parse
+    object_t o;
+    assert(sizeof(o) == 16);
+    *(((__uint64_t*)&o) + 0) = strtoll(de->d_name, 0, 16);
+    assert(de->d_name[16] == '.');
+    *(((__uint64_t*)&o) + 1) = strtoll(de->d_name+17, 0, 16);
+    dout(0) << " got " << o << " errno " << errno << " on " << de->d_name << endl;
+    if (errno) continue;
+    ls.push_back(o);
+  }
+  
+  ::closedir(dir);
+  return 0;
+}
 
+// eof.
index eaa4126e84e4647ec31eb891871b34a0d5af3cb6..4ad2cb4a054e84a14a81907f35d3f0c7fb860122 100644 (file)
@@ -32,31 +32,34 @@ using namespace __gnu_cxx;
 
 // fake attributes in memory, if we need to.
 
-
-class FakeStore : public ObjectStore, 
-                  public FakeStoreAttrs,
-                  public FakeStoreCollections {
+class FakeStore : public ObjectStore {
   string basedir;
   int whoami;
-  
-  int unsync;
 
-  Mutex lock;
+  Mutex synclock;
+  Cond synccond;
+  int unsync;
 
-  // fns
-  void get_dir(string& dir);
-  void get_oname(object_t oid, string& fn);
-  void wipe_dir(string mydir);
+  // fake attrs?
+  FakeStoreAttrs attrs;
+  bool fake_attrs;
 
+  // fake collections?
+  FakeStoreCollections collections;
+  bool fake_collections;
+  
+  // helper fns
+  void get_oname(object_t oid, char *s);
+  void get_cdir(coll_t cid, char *s);
+  void get_coname(coll_t cid, object_t oid, char *s);
 
  public:
-  FakeStore(char *base, int whoami) : FakeStoreAttrs(this), FakeStoreCollections(this)
-  {
-    this->basedir = base;
-    this->whoami = whoami;
-    unsync = 0;
-  }
-
+  FakeStore(char *base, int w) : 
+    basedir(base),
+    whoami(w),
+    unsync(0),
+    attrs(this), fake_attrs(false), 
+    collections(this), fake_collections(false) { }
 
   int mount();
   int umount();
@@ -73,15 +76,35 @@ class FakeStore : public ObjectStore,
   int stat(object_t oid, struct stat *st);
   int remove(object_t oid, Context *onsafe);
   int truncate(object_t oid, off_t size, Context *onsafe);
-  int read(object_t oid, 
-           off_t offset, size_t len,
-           bufferlist& bl);
-  int write(object_t oid, 
-            off_t offset, size_t len,
-            bufferlist& bl, 
-            Context *onsafe);
+  int read(object_t oid, off_t offset, size_t len, bufferlist& bl);
+  int write(object_t oid, off_t offset, size_t len, bufferlist& bl, Context *onsafe);
 
+  void sync();
   void sync(Context *onsafe);
+
+  // attrs
+  int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0);
+  int setattrs(object_t oid, map<string,bufferptr>& aset);
+  int getattr(object_t oid, const char *name, void *value, size_t size);
+  int getattrs(object_t oid, map<string,bufferptr>& aset);
+  int rmattr(object_t oid, const char *name, Context *onsafe=0);
+  //int listattr(object_t oid, char *attrs, size_t size);
+  int collection_setattr(coll_t c, const char *name, void *value, size_t size, Context *onsafe=0);
+  int collection_rmattr(coll_t c, const char *name, Context *onsafe=0);
+  int collection_getattr(coll_t c, const char *name, void *value, size_t size);
+  //int collection_listattr(coll_t c, char *attrs, size_t size);
+
+
+  // collections
+  int list_collections(list<coll_t>& ls);
+  int create_collection(coll_t c, Context *onsafe=0);
+  int destroy_collection(coll_t c, Context *onsafe=0);
+  int collection_stat(coll_t c, struct stat *st);
+  bool collection_exists(coll_t c);
+  int collection_add(coll_t c, object_t o, Context *onsafe=0);
+  int collection_remove(coll_t c, object_t o, Context *onsafe=0);
+  int collection_list(coll_t c, list<object_t>& o);
+
 };
 
 #endif
index 7e3bcc5568eed2b52e5ceae445583f795c45cb37..0d075ff4123f2a6d241a5033a30bc108d40f4360 100644 (file)
@@ -167,7 +167,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock)
   }
 #endif // USE_OSBDB
   else {
-    store = new FakeStore(osd_base_path, whoami); 
+    sprintf(dev_path, "osddata/osd%d", whoami);
+    store = new FakeStore(dev_path, whoami);
   }
 
 }
@@ -434,7 +435,7 @@ void OSD::_remove_pg(pg_t pgid)
          p++)
       t.remove(*p);
     t.remove_collection(pgid);
-    t.remove(object_t(1,pgid));  // log too
+    t.remove(pgid.to_object());  // log too
   }
   store->apply_transaction(t);
   
@@ -731,6 +732,7 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
   entity_name_t dest = inst.name;
 
   if (g_conf.ms_die_on_failure) {
+    dout(0) << "ms_handle_failure " << inst << " on " << *m << endl;
     exit(0);
   }
 
@@ -1018,6 +1020,8 @@ void OSD::advance_map(ObjectStore::Transaction& t)
     //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << endl;
     logger->set_start( osdmap->get_ctime() );
 
+    assert(g_conf.osd_mkfs);  // make sure we did a mkfs!
+
     // create PGs
     for (int nrep = 1; 
          nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep);    // for low osd counts..  hackish bleh
@@ -1734,7 +1738,7 @@ void OSD::handle_pg_log(MOSDPGLog *m)
     assert(pg->missing.num_lost() == 0);
 
     // ok activate!
-     pg->activate(t);
+    pg->activate(t);
   }
 
   unsigned tr = store->apply_transaction(t);
index 70bc92dd653f740fbcef97a8a428c29f7ea23cb0..9ff94adfcae995e46271c229192a356703d65d9c 100644 (file)
@@ -466,7 +466,7 @@ public:
     return -1; 
   }
 
-  virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+  //virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
   
   // collections
   virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
@@ -489,10 +489,10 @@ public:
                                 Context *onsafe=0) {return 0;} //= 0;
   virtual int collection_getattr(coll_t cid, const char *name,
                                  void *value, size_t size) {return 0;} //= 0;
-  virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;
+  //virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;
   
-  virtual void sync(Context *onsync) {};
-  virtual void sync() {};
+  virtual void sync(Context *onsync) {}
+  virtual void sync() {}
   
   
   virtual void _fake_writes(bool b) {};
index fb87630b3306f86b2c2723add14c606703f7af6c..218f9eac36aae5b079fdb7e8662bea1770995aa3 100644 (file)
@@ -826,7 +826,7 @@ void PG::activate(ObjectStore::Transaction& t)
   state_set(STATE_ACTIVE);
   state_clear(STATE_STRAY);
   if (is_crashed()) {
-    assert(is_replay());
+    //assert(is_replay());      // HELP.. not on replica?
     state_clear(STATE_CRASHED);
     state_clear(STATE_REPLAY);
   }
@@ -1174,6 +1174,8 @@ void PG::clean_replicas()
 
 void PG::write_log(ObjectStore::Transaction& t)
 {
+  dout(10) << "write_log" << endl;
+
   // assemble buffer
   bufferlist bl;
   
@@ -1186,12 +1188,16 @@ void PG::write_log(ObjectStore::Transaction& t)
     if (bl.length() % 4096 == 0)
       ondisklog.block_map[bl.length()] = p->version;
     bl.append((char*)&(*p), sizeof(*p));
+    if (g_conf.osd_pad_pg_log) {  // pad to 4k, until i fix ebofs reallocation crap.  FIXME.
+      bufferptr bp(4096 - sizeof(*p));
+      bl.push_back(bp);
+    }
   }
   ondisklog.top = bl.length();
   
   // write it
-  t.remove( object_t(1,info.pgid) );
-  t.write( object_t(1,info.pgid) , 0, bl.length(), bl);
+  t.remove( info.pgid.to_object() );
+  t.write( info.pgid.to_object() , 0, bl.length(), bl);
   t.collection_setattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
   t.collection_setattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
   
@@ -1234,6 +1240,8 @@ void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v)
 void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry, 
                     eversion_t trim_to)
 {
+  dout(10) << "append_log " << ondisklog.top << " " << logentry << endl;
+
   // write entry on disk
   bufferlist bl;
   bl.append( (char*)&logentry, sizeof(logentry) );
@@ -1241,7 +1249,7 @@ void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry,
     bufferptr bp(4096 - sizeof(logentry));
     bl.push_back(bp);
   }
-  t.write( object_t(1,info.pgid), ondisklog.top, bl.length(), bl );
+  t.write( info.pgid.to_object(), ondisklog.top, bl.length(), bl );
   
   // update block map?
   if (ondisklog.top % 4096 == 0) 
@@ -1263,30 +1271,43 @@ void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry,
 
 void PG::read_log(ObjectStore *store)
 {
+  int r;
   // load bounds
   ondisklog.bottom = ondisklog.top = 0;
-  store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
-  store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
-  
+  r = store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
+  assert(r == sizeof(ondisklog.bottom));
+  r = store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+  assert(r == sizeof(ondisklog.top));
+
+  dout(10) << "read_log [" << ondisklog.bottom << "," << ondisklog.top << ")" << endl;
+
   log.backlog = info.log_backlog;
   log.bottom = info.log_bottom;
   
   if (ondisklog.top > 0) {
     // read
     bufferlist bl;
-    store->read(object_t(1,info.pgid), ondisklog.bottom, ondisklog.top-ondisklog.bottom, bl);
+    store->read(info.pgid.to_object(), ondisklog.bottom, ondisklog.top-ondisklog.bottom, bl);
     
     PG::Log::Entry e;
     off_t pos = ondisklog.bottom;
+    assert(log.log.empty());
     while (pos < ondisklog.top) {
       bl.copy(pos-ondisklog.bottom, sizeof(e), (char*)&e);
+      dout(10) << "read_log " << pos << " " << e << endl;
+
       if (e.version > log.bottom || log.backlog) { // ignore items below log.bottom
         if (pos % 4096 == 0)
-          ondisklog.block_map[pos] = e.version;
+         ondisklog.block_map[pos] = e.version;
         log.log.push_back(e);
+      } else {
+       dout(10) << "read_log ignoring entry at " << pos << endl;
       }
       
-      pos += sizeof(e);
+      if (g_conf.osd_pad_pg_log)   // pad to 4k, until i fix ebofs reallocation crap.  FIXME.
+       pos += 4096;
+      else
+       pos += sizeof(e);
     }
   }
   log.top = info.last_update;
index 6d6de985eaf8e1d97b7b238dc6b1d366066ac78d..f3b00cf935f91261c0c8c31040583ec84fec5be5 100644 (file)
@@ -613,7 +613,7 @@ inline ostream& operator<<(ostream& out, const PG::Info::History& h)
 
 inline ostream& operator<<(ostream& out, const PG::Info& pgi) 
 {
-  out << "pginfo(" << hex << pgi.pgid << dec;
+  out << "pginfo(" << pgi.pgid;
   if (pgi.is_empty())
     out << " empty";
   else
@@ -669,8 +669,8 @@ inline ostream& operator<<(ostream& out, const PG& pg)
          !pg.log.backlog) ||
         (pg.log.log.rbegin()->version.version != pg.log.top.version)) {
       out << " (log bound mismatch, actual=["
-          << pg.log.log.begin()->version << ","
-          << pg.log.log.rbegin()->version << "])";
+         << pg.log.log.begin()->version << ","
+         << pg.log.log.rbegin()->version << "] len=" << pg.log.log.size() << ")";
     }
   }
 
index e86c074fa1b15727740915b3cc74b1e4b7002598..f8656e1f3e178ad4d1a92d3beda7b7d6ad83c4f3 100644 (file)
 
 #include "include/reqid.h"
 
+#define PG_INO 1
+
+
 // osd types
 typedef __uint64_t coll_t;        // collection id
 
-
 // pg stuff
 typedef __uint16_t ps_t;
 typedef __uint8_t pruleset_t;
@@ -28,13 +30,14 @@ typedef __uint8_t pruleset_t;
 struct pg_t {
   union {
     struct {
-      int         preferred;
-      ps_t        ps;
-      __uint8_t   nrep;
-      pruleset_t  ruleset;
+      __uint32_t  preferred:32; // 32
+      ps_t        ps:16;        // 16
+      __uint8_t   nrep:8;       //  8
+      pruleset_t  ruleset:8;    //  8
     } fields;
-    __uint64_t val;
+    __uint64_t val;          // 64
   } u;
+
   pg_t() { u.val = 0; }
   pg_t(const pg_t& o) { u.val = o.u.val; }
   pg_t(ps_t s, int p, unsigned char n, pruleset_t r=0) {
@@ -52,6 +55,8 @@ struct pg_t {
   pg_t operator++() { ++u.val; return *this; }
   */
   operator __uint64_t() const { return u.val; }
+
+  object_t to_object() const { return object_t(PG_INO, u.val >> 32, u.val & 0xffffffff); }
 };
 
 inline ostream& operator<<(ostream& out, pg_t pg) {
@@ -62,6 +67,8 @@ inline ostream& operator<<(ostream& out, pg_t pg) {
   if (pg.u.fields.preferred)
     out << pg.u.fields.preferred << '.';
   out << hex << pg.u.fields.ps << dec;
+  out << "=" << hex << pg.u.val << dec;
+  out << "=" << hex << (__uint64_t)pg << dec;
   return out;
 }