]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged r1409:1471 from trunk/ceph into branches/sage/pgs (the rest)
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Jul 2007 04:34:48 +0000 (04:34 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Jul 2007 04:34:48 +0000 (04:34 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1473 29311d96-e01e-0410-9327-a35deaab8ce9

42 files changed:
branches/sage/pgs/Makefile
branches/sage/pgs/TODO
branches/sage/pgs/client/Client.cc
branches/sage/pgs/client/Client.h
branches/sage/pgs/cmonctl.cc
branches/sage/pgs/config.cc
branches/sage/pgs/config.h
branches/sage/pgs/cosd.cc
branches/sage/pgs/doc/Commitdir.txt
branches/sage/pgs/doc/caching.txt
branches/sage/pgs/ebofs/BlockDevice.cc
branches/sage/pgs/ebofs/Ebofs.cc
branches/sage/pgs/ebofs/Ebofs.h
branches/sage/pgs/ebofs/FileJournal.cc
branches/sage/pgs/ebofs/FileJournal.h
branches/sage/pgs/ebofs/Journal.h
branches/sage/pgs/ebofs/test.ebofs.cc
branches/sage/pgs/ebofs/types.h
branches/sage/pgs/include/Context.h
branches/sage/pgs/include/buffer.h
branches/sage/pgs/mds/MDS.cc
branches/sage/pgs/mds/MDSMap.h
branches/sage/pgs/messages/MClientMount.h
branches/sage/pgs/messages/MClientUnmount.h
branches/sage/pgs/messages/MMDSBeacon.h
branches/sage/pgs/messages/MMonCommand.h
branches/sage/pgs/messages/MMonElection.h [new file with mode: 0644]
branches/sage/pgs/messages/MMonElectionAck.h [deleted file]
branches/sage/pgs/messages/MMonElectionPropose.h [deleted file]
branches/sage/pgs/messages/MMonElectionVictory.h [deleted file]
branches/sage/pgs/messages/MMonPaxos.h
branches/sage/pgs/messages/MOSDBoot.h
branches/sage/pgs/messages/MOSDFailure.h
branches/sage/pgs/messages/MOSDGetMap.h
branches/sage/pgs/msg/FakeMessenger.cc
branches/sage/pgs/msg/Message.cc
branches/sage/pgs/msg/Message.h
branches/sage/pgs/osd/OSD.cc
branches/sage/pgs/osd/OSDMap.h
branches/sage/pgs/osd/ObjectStore.h
branches/sage/pgs/osd/PG.cc
branches/sage/pgs/osdc/Objecter.cc

index 2321d7068063dbd9ca3165d30a5d7c35ffeb9ab8..9c2c757032eaf27ab0ea38274cf854c2e31dbb7d 100644 (file)
@@ -38,7 +38,8 @@ EBOFS_OBJS= \
        ebofs/BlockDevice.o\
        ebofs/BufferCache.o\
        ebofs/Ebofs.o\
-       ebofs/Allocator.o
+       ebofs/Allocator.o\
+       ebofs/FileJournal.o
 
 MDS_OBJS= \
        mds/MDS.o\
@@ -74,6 +75,7 @@ OSDC_OBJS= \
 MON_OBJS= \
        mon/Monitor.o\
        mon/Paxos.o\
+       mon/PaxosService.o\
        mon/OSDMonitor.o\
        mon/MDSMonitor.o\
        mon/ClientMonitor.o\
index d63633980022051a17841eeb01cb609068f8cab4..2a92d5834e7780895b5f8e74e5a250017f2466a9 100644 (file)
@@ -150,19 +150,12 @@ foreign rename
 - rejoin will need to explicitly resolve uncommitted items.
   - fully implement link/unlink first, and use that as a model?
 
-monitor
-- finish generic paxos
-
 osdmon
-- distribute w/ paxos framework
 - allow fresh replacement osds.  add osd_created in osdmap, probably
 - monitor needs to monitor some osds...
 - monitor pg states, notify on out?
 - watch osd utilization; adjust overload in cluster map
 
-mdsmon
-- distribute w/ paxos framework
-
 journaler
 - fix up for large events (e.g. imports)
 - use set_floor_and_read for safe takeover from possibly-not-quite-dead otherguy.
index 90e37a58a6ac2e1872d447e8cb0a55b24a0eda0b..11fd9149df92f577ddaff7caec9fe71753c1f7db 100644 (file)
@@ -95,13 +95,14 @@ public:
 
 // cons/des
 
-Client::Client(Messenger *m, MonMap *mm)
+Client::Client(Messenger *m, MonMap *mm) : timer(client_lock)
 {
   // which client am i?
   whoami = m->get_myname().num();
   monmap = mm;
 
   mounted = false;
+  mount_timeout_event = 0;
   unmounting = false;
 
   last_tid = 0;
@@ -875,21 +876,21 @@ void Client::handle_mds_map(MMDSMap* m)
   if (m->get_source().is_mds())
     frommds = m->get_source().num();
 
-  if (mdsmap == 0) 
+  if (mdsmap == 0) {
     mdsmap = new MDSMap;
 
-  if (whoami < 0) {
-    // mounted!
     assert(m->get_source().is_mon());
     whoami = m->get_dest().num();
     dout(1) << "handle_mds_map i am now " << m->get_dest() << endl;
     messenger->reset_myname(m->get_dest());
-
+    
     mount_cond.Signal();  // mount might be waiting for this.
-  }    
+  } 
 
   dout(1) << "handle_mds_map epoch " << m->get_epoch() << endl;
+  epoch_t was = mdsmap->get_epoch();
   mdsmap->decode(m->get_encoded());
+  assert(mdsmap->get_epoch() >= was);
   
   // send reconnect?
   if (frommds >= 0 && 
@@ -1252,7 +1253,28 @@ void Client::update_caps_wanted(Inode *in)
 
 
 // -------------------
-// fs ops
+// MOUNT
+
+void Client::_try_mount()
+{
+  dout(10) << "_try_mount" << endl;
+  int mon = monmap->pick_mon();
+  dout(2) << "sending client_mount to mon" << mon << endl;
+  messenger->send_message(new MClientMount(messenger->get_myaddr()), 
+                         monmap->get_inst(mon));
+
+  // schedule timeout
+  assert(mount_timeout_event == 0);
+  mount_timeout_event = new C_MountTimeout(this);
+  timer.add_event_after(g_conf.client_mount_timeout, mount_timeout_event);
+}
+
+void Client::_mount_timeout()
+{
+  dout(10) << "_mount_timeout" << endl;
+  mount_timeout_event = 0;
+  _try_mount();
+}
 
 int Client::mount()
 {
@@ -1260,14 +1282,15 @@ int Client::mount()
   assert(!mounted);  // caller is confused?
   assert(!mdsmap);
 
-  int mon = monmap->pick_mon();
-  dout(2) << "sending client_mount to mon" << mon << endl;
-  messenger->send_message(new MClientMount, monmap->get_inst(mon));
+  _try_mount();
   
   while (!mdsmap ||
         !osdmap || 
         osdmap->get_epoch() == 0)
     mount_cond.Wait(client_lock);
+
+  timer.cancel_event(mount_timeout_event);
+  mount_timeout_event = 0;
   
   mounted = true;
 
@@ -1291,6 +1314,9 @@ int Client::mount()
 }
 
 
+// UNMOUNT
+
+
 int Client::unmount()
 {
   client_lock.Lock();
@@ -1359,7 +1385,8 @@ int Client::unmount()
   // send unmount!
   int mon = monmap->pick_mon();
   dout(2) << "sending client_unmount to mon" << mon << endl;
-  messenger->send_message(new MClientUnmount, monmap->get_inst(mon));
+  messenger->send_message(new MClientUnmount(messenger->get_myinst()), 
+                         monmap->get_inst(mon));
   
   while (mounted)
     mount_cond.Wait(client_lock);
index 7e66efe7b6c85f64ce3378b70c5aaac96a6d83af..700ae3f1e11cc03e28061bc8b5db29160e5827b6 100644 (file)
 #include "include/interval_set.h"
 
 #include "common/Mutex.h"
+#include "common/Timer.h"
 
 #include "FileCache.h"
 
+
 // stl
 #include <set>
 #include <map>
@@ -333,6 +335,7 @@ class Client : public Dispatcher {
   MDSMap *mdsmap; 
   OSDMap *osdmap;
 
+  SafeTimer timer;
 
  protected:
   Messenger *messenger;  
@@ -574,6 +577,21 @@ protected:
 
   // ----------------------
   // fs ops.
+private:
+  void _try_mount();
+  void _mount_timeout();
+  Context *mount_timeout_event;
+
+  class C_MountTimeout : public Context {
+    Client *client;
+  public:
+    C_MountTimeout(Client *c) : client(c) { }
+    void finish(int r) {
+      if (r >= 0) client->_mount_timeout();
+    }
+  };
+
+public:
   int mount();
   int unmount();
 
index 19148942acc76608aa6cda8867764ee25788266a..34bd80f9a4d8f065efab34381e98b45f252712cf 100644 (file)
@@ -64,8 +64,13 @@ int main(int argc, char **argv, char *envp[]) {
   int r = monmap.read(".ceph_monmap");
   assert(r >= 0);
   
+  // start up network
+  rank.start_rank();
+  messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
+  messenger->set_dispatcher(&dispatcher);
+  
   // build command
-  MMonCommand *m = new MMonCommand;
+  MMonCommand *m = new MMonCommand(messenger->get_myinst());
   string cmd;
   for (unsigned i=0; i<args.size(); i++) {
     if (i) cmd += " ";
@@ -76,11 +81,6 @@ int main(int argc, char **argv, char *envp[]) {
 
   dout(0) << "mon" << mon << " <- '" << cmd << "'" << endl;
 
-  // start up network
-  rank.start_rank();
-  messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
-  messenger->set_dispatcher(&dispatcher);
-  
   // send it
   messenger->send_message(m, monmap.get_inst(mon));
 
index d6a21af1d03cd182377d6cc51acc1aa076d7e100..cef79bb9dd63e4f797669c09914f67c4d108d0f8 100644 (file)
@@ -101,7 +101,7 @@ md_config_t g_conf = {
 
   // --- clock ---
   clock_lock: false,
-  clock_tare: true,
+  clock_tare: false,
   
   // --- messenger ---
   ms_single_dispatch: false,
@@ -125,8 +125,13 @@ md_config_t g_conf = {
   // --- mon ---
   mon_tick_interval: 5,
   mon_osd_down_out_interval: 5,  // seconds
-  mon_lease: 2.000,  // seconds
-  mon_stop_with_last_mds: true,
+  mon_lease: 5,  // seconds    // lease interval
+  mon_lease_renew_interval: 3, // on leader, to renew the lease
+  mon_lease_ack_timeout: 10.0, // on leader, if lease isn't acked by all peons
+  mon_lease_timeout: 10.0,     // on peon, if lease isn't extended
+  mon_accept_timeout: 10.0,    // on leader, if paxos update isn't accepted
+  mon_stop_on_last_unmount: false,
+  mon_stop_with_last_mds: false,
 
   // --- client ---
   client_cache_size: 300,
@@ -142,6 +147,8 @@ md_config_t g_conf = {
   client_oc_max_dirty: 1024*1024* 5,    // MB * n
   client_oc_max_sync_write: 128*1024,   // writes >= this use wrlock
 
+  client_mount_timeout: 10.0,  // retry every N seconds
+
   client_hack_balance_reads: false,
 
   client_trace: 0,
@@ -194,7 +201,6 @@ md_config_t g_conf = {
   mds_trim_on_rejoin: true,
   mds_commit_on_shutdown: true,
   mds_shutdown_check: 0, //30,
-  mds_shutdown_on_last_unmount: true,
 
   mds_verify_export_dirauth: true,
 
@@ -600,8 +606,6 @@ void parse_config_options(std::vector<char*>& args)
       g_conf.mds_commit_on_shutdown = atoi(args[++i]);
     else if (strcmp(args[i], "--mds_shutdown_check") == 0) 
       g_conf.mds_shutdown_check = atoi(args[++i]);
-    else if (strcmp(args[i], "--mds_shutdown_on_last_unmount") == 0) 
-      g_conf.mds_shutdown_on_last_unmount = atoi(args[++i]);
     else if (strcmp(args[i], "--mds_log_flush_on_shutdown") == 0) 
       g_conf.mds_log_flush_on_shutdown = atoi(args[++i]);
 
@@ -663,6 +667,8 @@ void parse_config_options(std::vector<char*>& args)
 
     else if (strcmp(args[i], "--mon_osd_down_out_interval") == 0)
       g_conf.mon_osd_down_out_interval = atoi(args[++i]);
+    else if (strcmp(args[i], "--mon_stop_on_last_unmount") == 0) 
+      g_conf.mon_stop_on_last_unmount = atoi(args[++i]);
     else if (strcmp(args[i], "--mon_stop_with_last_mds") == 0)
       g_conf.mon_stop_with_last_mds = atoi(args[++i]);
 
index 7110a5ad6ea86f97daa750bda95d83a55f21de7f..0831e123ddbeea5c1c5119bfb7edae0f1655c94a 100644 (file)
@@ -115,6 +115,11 @@ struct md_config_t {
   int mon_tick_interval;
   int mon_osd_down_out_interval;
   float mon_lease;
+  float mon_lease_renew_interval;
+  float mon_lease_ack_timeout;
+  float mon_lease_timeout;
+  float mon_accept_timeout;
+  bool mon_stop_on_last_unmount;
   bool mon_stop_with_last_mds;
 
   // client
@@ -131,6 +136,8 @@ struct md_config_t {
   int      client_oc_max_dirty;
   size_t   client_oc_max_sync_write;
 
+  double   client_mount_timeout;
+
   // hack
   bool client_hack_balance_reads;
   
@@ -196,7 +203,6 @@ struct md_config_t {
   bool  mds_trim_on_rejoin;
   bool  mds_commit_on_shutdown;
   int   mds_shutdown_check;
-  bool  mds_shutdown_on_last_unmount;
 
   bool  mds_verify_export_dirauth;     // debug flag
 
index 800eacf5acd9a0151220cacf8eef3ca68a64a563..4f3c8ab71a19f667572db9141abd5c7441fcdfd7 100644 (file)
@@ -66,7 +66,8 @@ int main(int argc, char **argv)
   if (g_conf.clock_tare) g_clock.tare();
 
   // osd specific args
-  char *dev;
+  char *dev = 0;
+  char dev_default[20];
   int whoami = -1;
   for (unsigned i=0; i<args.size(); i++) {
     if (strcmp(args[i],"--dev") == 0) 
@@ -78,6 +79,13 @@ int main(int argc, char **argv)
       return -1;
     }
   }
+  if (whoami < 0) {
+    cerr << "must specify '--osd #' where # is the osd number" << endl;
+  }
+  if (!dev) {
+    sprintf(dev_default, "dev/osd%d", whoami);
+    dev = dev_default;
+  }
   cout << "dev " << dev << endl;
   
 
index 83c89bdcaef4af9492eed924942a16ade2c3b2c8..05c727be60ae68babda76c3b879ee6cd39eedc5b 100644 (file)
@@ -1,3 +1,5 @@
+OLD
+
 
 How Directory Committing Works:
 
index fe0c78331bd86b8543d1dab2996491d0b867572c..161eaf7428a537dc0b873fcd0d969b271be4ad3d 100644 (file)
@@ -114,6 +114,7 @@ we had the following partition:
 The subtree map on mds0 would be
 
  /     -> (/usr, /home)
+ /usr/local -> ()
  /home -> ()
 
 and on mds1:
index 29071fd5bd8daff04bff2ecd3e9fb061cac20f82..2c2b4ab18f092dd79ddc88edaac28e648dd28486 100644 (file)
@@ -639,7 +639,7 @@ int BlockDevice::_write(int fd, unsigned bno, unsigned num, bufferlist& bl)
     iov[n].iov_base = (void*)i->c_str();
     iov[n].iov_len = MIN(left, i->length());
 
-    assert((((unsigned long long)iov[n].iov_base) & 4095ULL) == 0);
+    assert((((intptr_t)iov[n].iov_base) & ((intptr_t)4095ULL)) == 0);
     assert((iov[n].iov_len & 4095) == 0);
     
     left -= iov[n].iov_len;
index 7d03dc074c30e534a8b34905dd6c6d0d9d3daed8..a30f031f10facc787041b72e8723573fdd00670a 100644 (file)
@@ -16,6 +16,8 @@
 
 #include "Ebofs.h"
 
+#include "FileJournal.h"
+
 #include <errno.h>
 
 #ifndef DARWIN
@@ -50,6 +52,7 @@ int Ebofs::mount()
   ebofs_lock.Lock();
   assert(!mounted);
 
+  // open dev
   int r = dev.open(&idle_kicker);
   if (r < 0) {
     ebofs_lock.Unlock();
@@ -79,6 +82,8 @@ int Ebofs::mount()
   dout(3) << "mount epoch " << super_epoch << endl;
   assert(super_epoch == sb->epoch);
 
+  super_fsid = sb->fsid;
+
   free_blocks = sb->free_blocks;
   limbo_blocks = sb->limbo_blocks;
 
@@ -101,6 +106,43 @@ int Ebofs::mount()
 
   allocator.release_limbo();
 
+  
+  // open journal?
+  if (journalfn) {
+    journal = new FileJournal(this, journalfn);
+    if (journal->open() < 0) {
+      dout(-3) << "mount journal " << journalfn << " open failed" << endl;
+      delete journal;
+      journal = 0;
+    } else {
+      dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
+      
+      while (1) {
+       bufferlist bl;
+       epoch_t e;
+       if (!journal->read_entry(bl, e)) {
+         dout(-3) << "mount replay: end of journal, done." << endl;
+         break;
+       }
+
+       if (e < super_epoch) {
+         dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
+       }
+       if (e == super_epoch+1) {
+         super_epoch++;
+         dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
+       }
+       assert(e == super_epoch);
+       
+       dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
+       Transaction t;
+       int off = 0;
+       t._decode(bl, off);
+       _apply_transaction(t);
+      }
+    }
+  }
+
   dout(3) << "mount starting commit+finisher threads" << endl;
   commit_thread.create();
   finisher_thread.create();
@@ -108,6 +150,7 @@ int Ebofs::mount()
   dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
   mounted = true;
 
+
   ebofs_lock.Unlock();
   return 0;
 }
@@ -126,6 +169,10 @@ int Ebofs::mkfs()
 
   block_t num_blocks = dev.get_num_blocks();
 
+  // make a super-random fsid
+  srand(time(0) ^ getpid());
+  super_fsid = (lrand48() << 32) ^ mrand48();
+
   free_blocks = 0;
   limbo_blocks = 0;
 
@@ -197,6 +244,18 @@ int Ebofs::mkfs()
 
   dev.close();
 
+
+  // create journal?
+  if (journalfn) {
+    journal = new FileJournal(this, journalfn);
+    if (journal->create() < 0) {
+      dout(3) << "mount journal " << journalfn << " created failed" << endl;
+      delete journal;
+    } else {
+      dout(3) << "mount journal " << journalfn << " created" << endl;
+    }
+  }
+
   dout(2) << "mkfs: " << dev.get_device_name() << " "  << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
   ebofs_lock.Unlock();
   return 0;
@@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp)
   // fill in super
   memset(&sb, 0, sizeof(sb));
   sb.s_magic = EBOFS_MAGIC;
+  sb.fsid = super_fsid;
   sb.epoch = epoch;
   sb.num_blocks = dev.get_num_blocks();
 
@@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry()
               << ", max dirty " << g_conf.ebofs_bc_max_dirty
               << endl;
       
+      if (journal) journal->commit_epoch_start();
       
       // (async) write onodes+condes  (do this first; it currently involves inode reallocation)
       commit_inodes_start();
@@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry()
         alloc_more_node_space();
       }
       
+      // signal journal
+      if (journal) journal->commit_epoch_finish();
+
       // kick waiters
       dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
       
-      finisher_lock.Lock();
-      finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
+      queue_finishers(commit_waiters[super_epoch-1]);
       commit_waiters.erase(super_epoch-1);
-      finisher_cond.Signal();
-      finisher_lock.Unlock();
 
       sync_cond.Signal();
 
@@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe)
   ebofs_lock.Lock();
   if (onsafe) {
     dirty = true;
-    commit_waiters[super_epoch].push_back(onsafe);
+
+    while (1) {
+      if (journal) {  
+       // journal empty transaction
+       Transaction t;
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   }
   ebofs_lock.Unlock();
 }
@@ -1994,6 +2066,29 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
   ebofs_lock.Lock();
   dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
 
+  unsigned r = _apply_transaction(t);
+
+  // journal, wait for commit
+  if (r != 0 && onsafe) {
+    delete onsafe;  // kill callback, but still journal below (in case transaction had side effects)
+    onsafe = 0;
+  }
+  while (1) {
+    if (journal) {
+      bufferlist bl;
+      t._encode(bl);
+      if (journal->submit_entry(bl, onsafe)) break; 
+    }
+    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    break;
+  }
+
+  ebofs_lock.Unlock();
+  return r;
+}
+
+unsigned Ebofs::_apply_transaction(Transaction& t)
+{
   // do ops
   unsigned r = 0;  // bit fields indicate which ops failed.
   int bit = 1;
@@ -2028,7 +2123,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_GETATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
         if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
           dout(7) << "apply_transaction fail on _getattr" << endl;
@@ -2095,7 +2190,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_SETATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
         bufferlist bl;
         bl.claim( t.attrbls.front() );
@@ -2121,7 +2216,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_RMATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         if (_rmattr(oid, attrname) < 0) {
           dout(7) << "apply_transaction fail on _rmattr" << endl;
           r &= bit;
@@ -2185,7 +2280,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_COLL_SETATTR:
       {
         coll_t cid = t.cids.front(); t.cids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
         bufferlist bl;
         bl.claim( t.attrbls.front() );
@@ -2201,7 +2296,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_COLL_RMATTR:
       {
         coll_t cid = t.cids.front(); t.cids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         if (_collection_rmattr(cid, attrname) < 0) {
           dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
           r &= bit;
@@ -2217,16 +2312,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     bit = bit << 1;
   }
   
-  dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
-  
-  // set up commit waiter
-  //if (r == 0) {
-  if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-  //} else {
-  //if (onsafe) delete onsafe;
-  //}
-  
-  ebofs_lock.Unlock();
+  dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
   return r;
 }
 
@@ -2295,36 +2381,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl)
 }
 
 
-/*int Ebofs::write(object_t oid, 
-                 off_t off, size_t len,
-                 bufferlist& bl, bool fsync)
-{
-  // wait?
-  if (fsync) {
-    // wait for flush.
-    Cond cond;
-    bool done;
-    int flush = 1;    // write never returns positive
-    Context *c = new C_Cond(&cond, &done, &flush);
-    int r = write(oid, off, len, bl, c);
-    if (r < 0) return r;
-    
-    ebofs_lock.Lock();
-    {
-      while (!done) 
-        cond.Wait(ebofs_lock);
-      assert(flush <= 0);
-    }
-    ebofs_lock.Unlock();
-    if (flush < 0) return flush;
-    return r;
-  } else {
-    // don't wait for flush.
-    return write(oid, off, len, bl, (Context*)0);
-  }
-}
-*/
-
 int Ebofs::write(object_t oid, 
                  off_t off, size_t len,
                  bufferlist& bl, Context *onsafe)
@@ -2338,7 +2394,17 @@ int Ebofs::write(object_t oid,
   // commit waiter
   if (r > 0) {
     assert((size_t)r == len);
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.write(oid, off, len, bl);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2370,9 +2436,19 @@ int Ebofs::remove(object_t oid, Context *onsafe)
   // do it
   int r = _remove(oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.remove(oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2445,9 +2521,19 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
   
   int r = _truncate(oid, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.truncate(oid, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2464,9 +2550,19 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe)
   
   int r = _clone(from, to);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.clone(from, to);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2640,9 +2736,19 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz
   ebofs_lock.Lock();
   int r = _setattr(oid, name, value, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.setattr(oid, name, value, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2673,9 +2779,19 @@ int Ebofs::setattrs(object_t oid, map<string,bufferptr>& attrset, Context *onsaf
   ebofs_lock.Lock();
   int r = _setattrs(oid, attrset);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.setattrs(oid, attrset);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2758,9 +2874,19 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe)
 
   int r = _rmattr(oid, name);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.rmattr(oid, name);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2835,9 +2961,19 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe)
 
   int r = _create_collection(cid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.create_collection(cid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2882,9 +3018,19 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
 
   int r = _destroy_collection(cid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.remove_collection(cid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2936,9 +3082,19 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe)
 
   int r = _collection_add(cid, oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_add(cid, oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2977,9 +3133,19 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe)
 
   int r = _collection_remove(cid, oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_remove(cid, oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3040,9 +3206,19 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s
 
   int r = _collection_setattr(cid, name, value, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_setattr(cid, name, value, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3098,9 +3274,19 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
 
   int r = _collection_rmattr(cid, name);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_rmattr(cid, name);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
index 91c5eb51b3cda6afb7837039120eb30bdff9e0ee..eb20cf89205316e5476a4a76577951a317dd3b53 100644 (file)
@@ -29,6 +29,7 @@ using namespace __gnu_cxx;
 #include "nodes.h"
 #include "Allocator.h"
 #include "Table.h"
+#include "Journal.h"
 
 #include "common/Mutex.h"
 #include "common/Cond.h"
@@ -40,20 +41,23 @@ typedef pair<coll_t,object_t> coll_object_t;
 
 
 class Ebofs : public ObjectStore {
- protected:
+protected:
   Mutex        ebofs_lock;    // a beautiful global lock
 
   // ** debuggy **
   bool         fake_writes;
 
   // ** super **
+public:
   BlockDevice  dev;
+protected:
   bool         mounted, unmounting, dirty;
   bool         readonly;
   version_t    super_epoch;
   bool         commit_thread_started, mid_commit;
   Cond         commit_cond;   // to wake up the commit thread
   Cond         sync_cond;
+  uint64_t     super_fsid;
 
   map<version_t, list<Context*> > commit_waiters;
 
@@ -71,9 +75,16 @@ class Ebofs : public ObjectStore {
     }
   } commit_thread;
 
-  
+public:
+  uint64_t get_fsid() { return super_fsid; }
+  epoch_t get_super_epoch() { return super_epoch; }
+protected:
 
 
+  // ** journal **
+  char *journalfn;
+  Journal *journal;
+
   // ** allocator **
   block_t      free_blocks, limbo_blocks;
   Allocator    allocator;
@@ -188,6 +199,21 @@ class Ebofs : public ObjectStore {
   bool           finisher_stop;
   list<Context*> finisher_queue;
 
+public:
+  void queue_finisher(Context *c) {
+    finisher_lock.Lock();
+    finisher_queue.push_back(c);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+  void queue_finishers(list<Context*>& ls) {
+    finisher_lock.Lock();
+    finisher_queue.splice(finisher_queue.end(), ls);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+protected:
+
   void *finisher_thread_entry();
   class FinisherThread : public Thread {
     Ebofs *ebofs;
@@ -204,12 +230,13 @@ class Ebofs : public ObjectStore {
 
 
  public:
-  Ebofs(char *devfn) : 
+  Ebofs(char *devfn, char *jfn=0) : 
     fake_writes(false),
     dev(devfn), 
     mounted(false), unmounting(false), dirty(false), readonly(false), 
     super_epoch(0), commit_thread_started(false), mid_commit(false),
     commit_thread(this),
+    journalfn(jfn), journal(0),
     free_blocks(0), limbo_blocks(0),
     allocator(this),
     nodepool(ebofs_lock),
@@ -222,6 +249,11 @@ class Ebofs : public ObjectStore {
     finisher_stop(false), finisher_thread(this) {
     for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
       free_tab[i] = 0;
+    if (!journalfn) {
+      journalfn = new char[strlen(devfn) + 100];
+      strcpy(journalfn, devfn);
+      strcat(journalfn, ".journal");
+    }
   }
   ~Ebofs() {
   }
@@ -298,6 +330,8 @@ class Ebofs : public ObjectStore {
 
 private:
   // private interface -- use if caller already holds lock
+  unsigned _apply_transaction(Transaction& t);
+
   int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
   int _is_cached(object_t oid, off_t off, size_t len);
   int _stat(object_t oid, struct stat *st);
index 87ea20199cd24625eed31811a0160f5b103efa2d..74edecf41c71a023ebc5b36785e3bc80f2a3e041 100644 (file)
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "FileJournal.h"
 #include "Ebofs.h"
 
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
 
+#include "config.h"
+#undef dout
+#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
 
 
-void FileJournal::create()
+int FileJournal::create()
 {
   dout(1) << "create " << fn << endl;
 
   // open/create
-  fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+  if (fd < 0) {
+    dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
+    return -errno;
+  }
   assert(fd > 0);
 
-  ::ftruncate(fd);
-  ::fchmod(fd, 0644);
+  //::ftruncate(fd, 0);
+  //::fchmod(fd, 0644);
+
+  // get size
+  struct stat st;
+  ::fstat(fd, &st);
+  dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
+
+  // write empty header
+  header.clear();
+  header.fsid = ebofs->get_fsid();
+  header.max_size = st.st_size;
+  write_header();
+  
+  // writeable.
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
 
   ::close(fd);
-}
 
+  return 0;
+}
 
-void FileJournal::open()
+int FileJournal::open()
 {
-  dout(1) << "open " << fn << endl;
+  //dout(1) << "open " << fn << endl;
 
   // open and file
   assert(fd == 0);
-  fd = ::open(fn.c_str(), O_RDWR);
+  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+  if (fd < 0) {
+    dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
+    return -errno;
+  }
   assert(fd > 0);
 
-  // read header?
-  // ***
+  // assume writeable, unless...
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
 
+  // read header?
+  read_header();
+  if (header.num > 0 && header.fsid == ebofs->get_fsid()) {
+    // valid header, pick an offset
+    for (int i=0; i<header.num; i++) {
+      if (header.epoch[i] == ebofs->get_super_epoch()) {
+       dout(2) << "using read_pos header pointer "
+               << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+       read_pos = header.offset[i];
+       write_pos = queue_pos = 0;
+       break;
+      }      
+      else if (header.epoch[i] < ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+      }
+      else if (header.epoch[i] > ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+       break;
+      }
+    }
+  }
 
   start_writer();
+
+  return 0;
 }
 
 void FileJournal::close()
@@ -49,7 +119,8 @@ void FileJournal::close()
   stop_writer();
 
   // close
-  assert(q.empty());
+  assert(writeq.empty());
+  assert(commitq.empty());
   assert(fd > 0);
   ::close(fd);
   fd = 0;
@@ -73,12 +144,36 @@ void FileJournal::stop_writer()
 }
 
 
+void FileJournal::print_header()
+{
+  for (int i=0; i<header.num; i++) {
+    if (i && header.offset[i] < header.offset[i-1]) {
+      assert(header.wrap);
+      dout(10) << "header: wrap at " << header.wrap << endl;
+    }
+    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
+  }
+  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
+}
+void FileJournal::read_header()
+{
+  dout(10) << "read_header" << endl;
+  memset(&header, 0, sizeof(header));  // zero out (read may fail)
+  ::lseek(fd, 0, SEEK_SET);
+  int r = ::read(fd, &header, sizeof(header));
+  if (r < 0) 
+    dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
+  print_header();
+}
 void FileJournal::write_header()
 {
-  dout(10) << "write_header" << endl;
-  
+  dout(10) << "write_header " << endl;
+  print_header();
+
   ::lseek(fd, 0, SEEK_SET);
-  ::write(fd, &header, sizeof(header));
+  int r = ::write(fd, &header, sizeof(header));
+  if (r < 0) 
+    dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
 }
 
 
@@ -91,6 +186,7 @@ void FileJournal::write_thread_entry()
     if (writeq.empty()) {
       // sleep
       dout(20) << "write_thread_entry going to sleep" << endl;
+      assert(write_pos == queue_pos);
       write_cond.Wait(write_lock);
       dout(20) << "write_thread_entry woke up" << endl;
       continue;
@@ -99,24 +195,35 @@ void FileJournal::write_thread_entry()
     // do queued writes
     while (!writeq.empty()) {
       // grab next item
-      epoch_t e = writeq.front().first;
+      epoch_t epoch = writeq.front().first;
       bufferlist bl;
       bl.claim(writeq.front().second);
       writeq.pop_front();
       Context *oncommit = commitq.front();
       commitq.pop_front();
       
-      dout(15) << "write_thread_entry writing " << bottom << " : " 
+      // wrap?
+      if (write_pos == header.wrap) {
+       dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
+       assert(header.wrap == write_pos);
+       write_header();
+       write_pos = sizeof(header_t);
+      }
+
+      // write!
+      dout(15) << "write_thread_entry writing " << write_pos << " : " 
               << bl.length() 
-              << " epoch " << e
+              << " epoch " << epoch
               << endl;
       
-      // write epoch, len, data.
-      ::fseek(fd, bottom, SEEK_SET);
-      ::write(fd, &e, sizeof(e));
-      
-      uint32_t len = bl.length();
-      ::write(fd, &len, sizeof(len));
+      // write entry header
+      entry_header_t h;
+      h.epoch = epoch;
+      h.len = bl.length();
+      h.make_magic(write_pos, header.fsid);
+
+      ::lseek(fd, write_pos, SEEK_SET);
+      ::write(fd, &h, sizeof(h));
       
       for (list<bufferptr>::const_iterator it = bl.buffers().begin();
           it != bl.buffers().end();
@@ -124,14 +231,21 @@ void FileJournal::write_thread_entry()
        if ((*it).length() == 0) continue;  // blank buffer.
        ::write(fd, (char*)(*it).c_str(), (*it).length() );
       }
+
+      ::write(fd, &h, sizeof(h));
       
       // move position pointer
-      bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+      write_pos += 2*sizeof(entry_header_t) + bl.length();
       
-      // do commit callback
       if (oncommit) {
-       oncommit->finish(0);
-       delete oncommit;
+       if (1) {
+         // queue callback
+         ebofs->queue_finisher(oncommit);
+       } else {
+         // callback now
+         oncommit->finish(0);
+         delete oncommit;
+       }
       }
     }
   }
@@ -140,61 +254,202 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry finish" << endl;
 }
 
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
 {
-  dout(10) << "submit_entry " << bottom << " : " << e.length()
-          << " epoch " << ebofs->super_epoch
+  assert(queue_pos != 0);  // bad create(), or journal didn't replay to completion.
+
+  // ** lock **
+  Mutex::Locker locker(write_lock);
+
+  // wrap? full?
+  off_t size = 2*sizeof(entry_header_t) + e.length();
+
+  if (full) return false;  // already marked full.
+
+  if (header.wrap) {
+    // we're wrapped.  don't overwrite ourselves.
+    if (queue_pos + size >= header.offset[0]) {
+      dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+              << " >= " << header.offset[0]
+              << endl;
+      full = true;
+      print_header();
+      return false;      
+    }
+  } else {
+    // we haven't wrapped.  
+    if (queue_pos + size >= header.max_size) {
+      // is there room if we wrap?
+      if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+       // yes!
+       dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
+       header.wrap = queue_pos;
+       queue_pos = sizeof(header_t);
+       header.push(ebofs->get_super_epoch(), queue_pos);
+      } else {
+       // no room.
+       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+                << " >= " << header.max_size
+                << endl;
+       full = true;
+       return false;
+      }
+    }
+  }
+  
+  dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+          << " epoch " << ebofs->get_super_epoch()
           << " " << oncommit << endl;
   
   // dump on queue
-  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
   commitq.push_back(oncommit);
-
+  
+  queue_pos += size;
+  
   // kick writer thread
   write_cond.Signal();
+
+  return true;
 }
 
 
 void FileJournal::commit_epoch_start()
 {
-  dout(10) << "commit_epoch_start" << endl;
+  dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 
+          << " -- new epoch " << ebofs->get_super_epoch()
+          << endl;
 
-  write_lock.Lock();
-  {
-    header.epoch2 = ebofs->super_epoch;
-    header.top2 = bottom;
-    write_header();
-  }
-  write_lock.Unlock();
+  Mutex::Locker locker(write_lock);
+
+  // was full -> empty -> now usable?
+  if (full) {
+    if (header.num != 0) {
+      dout(1) << " journal FULL, ignoring this epoch" << endl;
+      return;
+    }
+    
+    dout(1) << " clearing FULL flag, journal now usable" << endl;
+    full = false;
+  } 
+
+  // note epoch boundary
+  header.push(ebofs->get_super_epoch(), queue_pos);  // note: these entries may not yet be written.
+  //write_header();  // no need to write it now, though...
 }
 
 void FileJournal::commit_epoch_finish()
 {
-  dout(10) << "commit_epoch_finish" << endl;
+  dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
 
   write_lock.Lock();
   {
-    // update header
-    header.epoch1 = ebofs->super_epoch;
-    header.top1 = header.top2;
-    header.epoch2 = 0;
-    header.top2 = 0;
+    if (full) {
+      // full journal damage control.
+      dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << endl;
+      header.clear();
+      write_pos = queue_pos = sizeof(header_t);
+    } else {
+      // update header -- trim/discard old (committed) epochs
+      while (header.epoch[0] < ebofs->get_super_epoch())
+       header.pop();
+    }
     write_header();
 
-    // flush any unwritten items in previous epoch
-    while (!writeq.empty() &&
-          writeq.front().first < ebofs->super_epoch) {
-      dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
-      writeq.pop_front();
+    // discard any unwritten items in previous epoch, and do callbacks
+    epoch_t epoch = ebofs->get_super_epoch();
+    list<Context*> callbacks;
+    while (!writeq.empty() && writeq.front().first < epoch) {
+      dout(15) << " dropping unwritten and committed " 
+              << write_pos << " : " << writeq.front().second.length()
+              << " epoch " << writeq.front().first 
+              << endl;
+      // finisher?
       Context *oncommit = commitq.front();
+      if (oncommit) callbacks.push_back(oncommit);
+
+      write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+      // discard.
+      writeq.pop_front();  
       commitq.pop_front();
-         
-      if (oncommit) {
-       oncommit->finish(0);
-       delete oncommit;
-      }
     }
+    
+    // queue the finishers
+    ebofs->queue_finishers(callbacks);
   }
   write_lock.Unlock();
   
 }
+
+
+void FileJournal::make_writeable()
+{
+  if (read_pos)
+    write_pos = queue_pos = read_pos;
+  else
+    write_pos = queue_pos = sizeof(header_t);
+  read_pos = 0;
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+  if (!read_pos) {
+    dout(1) << "read_entry -- not readable" << endl;
+    make_writeable();
+    return false;
+  }
+
+  if (read_pos == header.wrap) {
+    // find wrap point
+    for (int i=1; i<header.num; i++) {
+      if (header.offset[i] < read_pos) {
+       assert(header.offset[i-1] < read_pos);
+       read_pos = header.offset[i];
+       break;
+      }
+    }
+    assert(read_pos != header.wrap);
+    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
+  }
+
+  // header
+  entry_header_t h;
+  ::lseek(fd, read_pos, SEEK_SET);
+  ::read(fd, &h, sizeof(h));
+  if (!h.check_magic(read_pos, header.fsid)) {
+    dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
+    make_writeable();
+    return false;
+  }
+
+  // body
+  bufferptr bp(h.len);
+  ::read(fd, bp.c_str(), h.len);
+
+  // footer
+  entry_header_t f;
+  ::read(fd, &f, sizeof(h));
+  if (!f.check_magic(read_pos, header.fsid) ||
+      h.epoch != f.epoch ||
+      h.len != f.len) {
+    dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
+    make_writeable();
+    return false;
+  }
+
+
+  // yay!
+  dout(1) << "read_entry " << read_pos << " : " 
+         << " " << h.len << " bytes"
+         << " epoch " << h.epoch 
+         << endl;
+  
+  bl.push_back(bp);
+  epoch = h.epoch;
+
+  read_pos += 2*sizeof(entry_header_t) + h.len;
+
+  return true;
+}
index 6c34f24f339559a425df42d7036f8803060f9454..a26f75ec97ff6b6c390cd3c3b6c82067b4da29a0 100644 (file)
 
 
 #include "Journal.h"
-
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
 
 class FileJournal : public Journal {
 public:
+  /** log header
+   * we allow 3 pointers:
+   *  top/initial,
+   *  one for an epoch boundary,
+   *  and one for a wrap in the ring buffer/journal file.
+   * the epoch boundary one is useful only for speedier recovery in certain cases
+   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+   */
   struct header_t {
-    epoch_t epoch1;
-    off_t top1;
-    epoch_t epoch2;
-    off_t top2;
+    uint64_t fsid;
+    int num;
+    off_t wrap;
+    off_t max_size;
+    epoch_t epoch[3];
+    off_t offset[3];
+
+    header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+    void clear() {
+      num = 0;
+      wrap = 0;
+    }
+    void pop() {
+      if (num >= 2 && offset[0] > offset[1]) 
+       wrap = 0;  // we're eliminating a wrap
+      num--;
+      for (int i=0; i<num; i++) {
+       epoch[i] = epoch[i+1];
+       offset[i] = offset[i+1];
+      }
+    }
+    void push(epoch_t e, off_t o) {
+      assert(num < 3);
+      epoch[num] = e;
+      offset[num] = o;
+      num++;
+    }
   } header;
 
+  struct entry_header_t {
+    uint64_t epoch;
+    uint64_t len;
+    uint64_t magic1;
+    uint64_t magic2;
+    
+    void make_magic(off_t pos, uint64_t fsid) {
+      magic1 = pos;
+      magic2 = fsid ^ epoch ^ len;
+    }
+    bool check_magic(off_t pos, uint64_t fsid) {
+      return
+       magic1 == (uint64_t)pos &&
+       magic2 == (fsid ^ epoch ^ len);
+    }
+  };
+
 private:
   string fn;
 
-  off_t max_size;
-  off_t top;            // byte of first entry chronologically
-  off_t bottom;         // byte where next entry goes
-  off_t committing_to;  // offset of epoch boundary, if we are committing
+  bool full;
+  off_t write_pos;      // byte where next entry written goes
+  off_t queue_pos;      // byte where next entry queued for write goes
+
+  off_t read_pos;       // 
 
   int fd;
 
@@ -47,39 +99,44 @@ private:
   Cond write_cond;
   bool write_stop;
 
+  void print_header();
+  void read_header();
   void write_header();
   void start_writer();
   void stop_writer();
   void write_thread_entry();
 
+  void make_writeable();
+
   class Writer : public Thread {
     FileJournal *journal;
   public:
     Writer(FileJournal *fj) : journal(fj) {}
     void *entry() {
-      journal->write_thread();
+      journal->write_thread_entry();
       return 0;
     }
   } write_thread;
 
  public:
-  FileJournal(Ebofs *e, char *f, off_t sz) : 
-    Journal(e),
-    fn(f), max_size(sz),
-    top(0), bottom(0), committing_to(0),
+  FileJournal(Ebofs *e, char *f) : 
+    Journal(e), fn(f),
+    full(false),
+    write_pos(0), queue_pos(0), read_pos(0),
     fd(0),
-    write_stop(false), write_thread(this)
-  { }
+    write_stop(false), write_thread(this) { }
   ~FileJournal() {}
 
-  void create();
-  void open();
+  int create();
+  int open();
   void close();
 
   // writes
-  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
-  void commit_epoch_start();  // mark epoch boundary
-  void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+  bool submit_entry(bufferlist& e, Context *oncommit);  // submit an item
+  void commit_epoch_start();   // mark epoch boundary
+  void commit_epoch_finish();  // mark prior epoch as committed (we can expire)
+
+  bool read_entry(bufferlist& bl, epoch_t& e);
 
   // reads
 };
index c05bce5955c5fdb8ddaac9ef19ede19099e0a23f..fb1983c22eafc1672954fd1a745cdb498e4bd10f 100644 (file)
 #ifndef __EBOFS_JOURNAL_H
 #define __EBOFS_JOURNAL_H
 
+class Ebofs;
+
+#include "include/buffer.h"
+#include "include/Context.h"
 
 class Journal {
+protected:
   Ebofs *ebofs;
 
- public:
+public:
   Journal(Ebofs *e) : ebofs(e) { }
   virtual ~Journal() { }
 
-  virtual void create() = 0;
-  virtual void open() = 0;
+  virtual int create() = 0;
+  virtual int open() = 0;
   virtual void close() = 0;
 
   // writes
-  virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+  virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
   virtual void commit_epoch_start() = 0;  // mark epoch boundary
-  virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+  virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
+  virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
 
   // reads/recovery
   
index 704ec1658182421fcf9497b6ca52fdce50704732..345f49b7a68ca35f94423b4d18964f08addf9723 100644 (file)
@@ -145,6 +145,7 @@ int main(int argc, char **argv)
   char *filename = args[0];
   int seconds = atoi(args[1]);
   int threads = atoi(args[2]);
+  if (!threads) threads = 1;
 
   cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
 
@@ -153,7 +154,7 @@ int main(int argc, char **argv)
 
 
   // explicit tests
-  if (1) {
+  if (0) {
     // verify that clone() plays nice with partial writes
     object_t oid(1,1);
     bufferptr bp(10000);
index b03bb8a40d9c9a123cded4558aa4c2b778d3a60f..1fa209a3deeb9c67832e0ac82cc6b18ade92979a 100644 (file)
@@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2;
 
 
 struct ebofs_super {
-  unsigned s_magic;
-  
-  unsigned epoch;             // version of this superblock.
+  uint64_t s_magic;
+  uint64_t fsid;
+
+  epoch_t epoch;             // version of this superblock.
 
-  unsigned num_blocks;        /* # blocks in filesystem */
+  uint64_t num_blocks;        /* # blocks in filesystem */
 
   // some basic stats, for kicks
-  unsigned free_blocks;       /* unused blocks */
-  unsigned limbo_blocks;      /* limbo blocks */
+  uint64_t free_blocks;       /* unused blocks */
+  uint64_t limbo_blocks;      /* limbo blocks */
   //unsigned num_objects;
   //unsigned num_fragmented;
   
index 7f2f30104407b8b5e39cfd5ab81289ed5fb0bd3a..d73a1d8c739b66180c0a438f69f433e23d533fc7 100644 (file)
@@ -44,11 +44,14 @@ inline void finish_contexts(std::list<Context*>& finished,
   using std::cout;
   using std::endl;
   
+  list<Context*> ls;
   if (finished.empty()) return;
 
-  dout(10) << finished.size() << " contexts to finish with " << result << endl;
-  for (std::list<Context*>::iterator it = finished.begin(); 
-       it != finished.end(); 
+  ls.swap(finished); // swap out of place to avoid weird loops
+
+  dout(10) << ls.size() << " contexts to finish with " << result << endl;
+  for (std::list<Context*>::iterator it = ls.begin(); 
+       it != ls.end(); 
        it++) {
     Context *c = *it;
     dout(10) << "---- " << c << endl;
index 1f401513f688c90a2bf726477083827ac8e1cca9..1f61d3b892ba5d025047785fd8076f69dbfa5d7a 100644 (file)
@@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off)
   off += len+1;
 }
 
+// const char* (encode only, string compatible)
+inline void _encode(const char *s, bufferlist& bl) 
+{
+  uint32_t len = strlen(s);
+  _encoderaw(len, bl);
+  bl.append(s, len+1);
+}
+
 // bufferptr (encapsulated)
 inline void _encode(bufferptr& bp, bufferlist& bl) 
 {
index e4b0d21d6959b5f35eeafac7868d7d89ce1ebdc0..f2e40bff824a49d375a672642a5eb92582496042 100644 (file)
@@ -255,11 +255,8 @@ public:
 int MDS::init(bool standby)
 {
   mds_lock.Lock();
-
-  if (standby)
-    want_state = MDSMap::STATE_STANDBY;
-  else
-    want_state = MDSMap::STATE_STARTING;
+  
+  want_state = MDSMap::STATE_BOOT;
   
   // starting beacon.  this will induce an MDSMap from the monitor
   beacon_start();
@@ -367,7 +364,7 @@ void MDS::beacon_send()
   beacon_seq_stamp[beacon_last_seq] = g_clock.now();
   
   int mon = monmap->pick_mon();
-  messenger->send_message(new MMDSBeacon(want_state, beacon_last_seq),
+  messenger->send_message(new MMDSBeacon(messenger->get_myinst(), want_state, beacon_last_seq),
                          monmap->get_inst(mon));
 
   // schedule next sender
@@ -481,7 +478,7 @@ void MDS::handle_mds_map(MMDSMap *m)
   mdsmap->decode(m->get_encoded());
   
   // see who i am
-  whoami = mdsmap->get_inst_rank(messenger->get_myaddr());
+  whoami = mdsmap->get_addr_rank(messenger->get_myaddr());
   if (oldwhoami != whoami) {
     // update messenger.
     messenger->reset_myname(MSG_ADDR_MDS(whoami));
index 8c47e9ed096818d5287995f4adcb6ffc23e123d3..46d856316740c0b8360538993c618b81ccb89d28 100644 (file)
@@ -33,16 +33,17 @@ class MDSMap {
   static const int STATE_OUT =       1;  // down, once existed, but no subtrees, empty log.
   static const int STATE_FAILED =    2;  // down, active subtrees; needs to be recovered.
 
-  static const int STATE_STANDBY  =  3;  // up, but inactive.  waiting for assignment by monitor.
-  static const int STATE_CREATING  = 4;  // up, creating MDS instance (new journal, idalloc..)
-  static const int STATE_STARTING  = 5;  // up, starting prior out MDS instance.
-  static const int STATE_REPLAY    = 6;  // up, scanning journal, recoverying any shared state
-  static const int STATE_RESOLVE   = 7;  // up, disambiguating partial distributed operations (import/export, ...rename?)
-  static const int STATE_RECONNECT = 8;  // up, reconnect to clients
-  static const int STATE_REJOIN    = 9;  // up, replayed journal, rejoining distributed cache
-  static const int STATE_ACTIVE =    10; // up, active
-  static const int STATE_STOPPING  = 11; // up, exporting metadata (-> standby or out)
-  static const int STATE_STOPPED   = 12; // up, finished stopping.  like standby, but not avail to takeover.
+  static const int STATE_BOOT     =  3;  // up, started, joining cluster.
+  static const int STATE_STANDBY  =  4;  // up, but inactive.  waiting for assignment by monitor.
+  static const int STATE_CREATING  = 5;  // up, creating MDS instance (new journal, idalloc..)
+  static const int STATE_STARTING  = 6;  // up, starting prior out MDS instance.
+  static const int STATE_REPLAY    = 7;  // up, scanning journal, recoverying any shared state
+  static const int STATE_RESOLVE   = 8;  // up, disambiguating partial distributed operations (import/export, ...rename?)
+  static const int STATE_RECONNECT = 9;  // up, reconnect to clients
+  static const int STATE_REJOIN    = 10; // up, replayed journal, rejoining distributed cache
+  static const int STATE_ACTIVE =    11; // up, active
+  static const int STATE_STOPPING  = 12; // up, exporting metadata (-> standby or out)
+  static const int STATE_STOPPED   = 13; // up, finished stopping.  like standby, but not avail to takeover.
   
   static const char *get_state_name(int s) {
     switch (s) {
@@ -51,6 +52,7 @@ class MDSMap {
     case STATE_OUT:       return "down:out";
     case STATE_FAILED:    return "down:failed";
       // up
+    case STATE_BOOT:      return "up:boot";
     case STATE_STANDBY:   return "up:standby";
     case STATE_CREATING:  return "up:creating";
     case STATE_STARTING:  return "up:starting";
@@ -170,6 +172,7 @@ class MDSMap {
   bool is_out(int m)      { return mds_state.count(m) && mds_state[m] == STATE_OUT; }
   bool is_failed(int m)    { return mds_state.count(m) && mds_state[m] == STATE_FAILED; }
 
+  bool is_boot(int m)  { return mds_state.count(m) && mds_state[m] == STATE_BOOT; }
   bool is_standby(int m)  { return mds_state.count(m) && mds_state[m] == STATE_STANDBY; }
   bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; }
   bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; }
@@ -226,7 +229,7 @@ class MDSMap {
     return false;
   }
   
-  int get_inst_rank(const entity_addr_t& addr) {
+  int get_addr_rank(const entity_addr_t& addr) {
     for (map<int,entity_inst_t>::iterator p = mds_inst.begin();
         p != mds_inst.end();
         ++p) {
index c3bc0009118357f162beb94280e841a5a76bfece..d083d72833830a885c62e74358bfd14ebe05076f 100644 (file)
 
 class MClientMount : public Message {
 public:
+  entity_addr_t addr;
+
   MClientMount() : Message(MSG_CLIENT_MOUNT) { }
+  MClientMount(entity_addr_t a) : 
+    Message(MSG_CLIENT_MOUNT),
+    addr(a) { }
 
   char *get_type_name() { return "client_mount"; }
 
-  void decode_payload() { }
-  void encode_payload() { }
+  void decode_payload() { 
+    int off = 0;
+    ::_decode(addr, payload, off);
+  }
+  void encode_payload() { 
+    ::_encode(addr, payload);
+  }
 };
 
 #endif
index e8acc50f190e0ae9b5b451cd1ec81ccecedca12a..42fa07db7ba05dd644ad242432fbed6594dd36c4 100644 (file)
 
 class MClientUnmount : public Message {
 public:
+  entity_inst_t inst;
+  
   MClientUnmount() : Message(MSG_CLIENT_UNMOUNT) { }
-
+  MClientUnmount(entity_inst_t i) : 
+    Message(MSG_CLIENT_UNMOUNT),
+    inst(i) { }
+  
   char *get_type_name() { return "client_unmount"; }
 
-  void decode_payload() { }
-  void encode_payload() { }
+  void decode_payload() { 
+    int off = 0;
+    ::_decode(inst, payload, off);
+  }
+  void encode_payload() { 
+    ::_encode(inst, payload);
+  }
 };
 
 #endif
index 4789c809572c4a4732169b6f76053efcab9df5f9..d8b73a45a31227ece07d196a68504cb83f17feb7 100644 (file)
 #include "mds/MDSMap.h"
 
 class MMDSBeacon : public Message {
+  entity_inst_t inst;
   int state;
   version_t seq;
 
  public:
   MMDSBeacon() : Message(MSG_MDS_BEACON) {}
-  MMDSBeacon(int st, version_t se) : Message(MSG_MDS_BEACON), 
-                                    state(st), seq(se) { }
+  MMDSBeacon(entity_inst_t i, int st, version_t se) : 
+    Message(MSG_MDS_BEACON), 
+    inst(i), state(st), seq(se) { }
 
+  entity_inst_t& get_mds_inst() { return inst; }
   int get_state() { return state; }
   version_t get_seq() { return seq; }
   char *get_type_name() { return "mdsbeacon"; }
 
   void print(ostream& out) {
-    out << "mdsbeacon(" << MDSMap::get_state_name(state) 
+    out << "mdsbeacon(" << inst
+       << " " << MDSMap::get_state_name(state) 
        << " seq " << seq << ")";
   }
   
   void encode_payload() {
-    payload.append((char*)&state, sizeof(state));
-    payload.append((char*)&seq, sizeof(seq));
+    ::_encode(inst, payload);
+    ::_encode(state, payload);
+    ::_encode(seq, payload);
   }
   void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(state), (char*)&state);
-    off += sizeof(state);
-    payload.copy(off, sizeof(seq), (char*)&seq);
-    off += sizeof(seq);
+    ::_decode(inst, payload, off);
+    ::_decode(state, payload, off);
+    ::_decode(seq, payload, off);
   }
 };
 
index d5fd8ae64017a8fdfa5cc75681d47425ae59106d..19d25dd7a4d775f5974db5aa465270dad6321156 100644 (file)
@@ -22,9 +22,13 @@ using std::vector;
 
 class MMonCommand : public Message {
  public:
+  entity_inst_t inst;
   vector<string> cmd;
 
   MMonCommand() : Message(MSG_MON_COMMAND) {}
+  MMonCommand(entity_inst_t i) : 
+    Message(MSG_MON_COMMAND),
+    inst(i) { }
   
   virtual char *get_type_name() { return "mon_command"; }
   void print(ostream& o) {
@@ -37,10 +41,12 @@ class MMonCommand : public Message {
   }
   
   void encode_payload() {
+    ::_encode(inst, payload);
     ::_encode(cmd, payload);
   }
   void decode_payload() {
     int off = 0;
+    ::_decode(inst, payload, off);
     ::_decode(cmd, payload, off);
   }
 };
diff --git a/branches/sage/pgs/messages/MMonElection.h b/branches/sage/pgs/messages/MMonElection.h
new file mode 100644 (file)
index 0000000..14a29af
--- /dev/null
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MMONELECTION_H
+#define __MMONELECTION_H
+
+#include "msg/Message.h"
+
+
+class MMonElection : public Message {
+public:
+  static const int OP_PROPOSE = 1;
+  static const int OP_ACK     = 2;
+  static const int OP_NAK     = 3;
+  static const int OP_VICTORY = 4;
+  static const char *get_opname(int o) {
+    switch (o) {
+    case OP_PROPOSE: return "propose";
+    case OP_ACK: return "ack";
+       case OP_NAK: return "nak";
+    case OP_VICTORY: return "victory";
+    default: assert(0); return 0;
+    }
+  }
+  
+  int32_t op;
+  epoch_t epoch;
+
+  MMonElection() : Message(MSG_MON_ELECTION) {}
+  MMonElection(int o, epoch_t e) : 
+       Message(MSG_MON_ELECTION), 
+       op(o), epoch(e) {}
+  
+  char *get_type_name() { return "election"; }
+  void print(ostream& out) {
+       out << "election(" << get_opname(op) << " " << epoch << ")";
+  }
+  
+  void encode_payload() {
+       ::_encode(op, payload);
+       ::_encode(epoch, payload);
+  }
+  void decode_payload() {
+       int off = 0;
+       ::_decode(op, payload, off);
+       ::_decode(epoch, payload, off);
+  }
+
+};
+
+#endif
diff --git a/branches/sage/pgs/messages/MMonElectionAck.h b/branches/sage/pgs/messages/MMonElectionAck.h
deleted file mode 100644 (file)
index 14f8b7c..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MMONELECTIONACK_H
-#define __MMONELECTIONACK_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionAck : public Message {
- public:
-  MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
-  
-  virtual char *get_type_name() { return "election_ack"; }
-
-  void encode_payload() {}
-  void decode_payload() {}
-};
-
-#endif
diff --git a/branches/sage/pgs/messages/MMonElectionPropose.h b/branches/sage/pgs/messages/MMonElectionPropose.h
deleted file mode 100644 (file)
index 7ec54b6..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MMONELECTIONPROPOSE_H
-#define __MMONELECTIONPROPOSE_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionPropose : public Message {
- public:
-  MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
-  
-  virtual char *get_type_name() { return "election_propose"; }
-
-  void encode_payload() {}
-  void decode_payload() {}
-
-};
-
-#endif
diff --git a/branches/sage/pgs/messages/MMonElectionVictory.h b/branches/sage/pgs/messages/MMonElectionVictory.h
deleted file mode 100644 (file)
index 47211ae..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MMONELECTIONVICTORY_H
-#define __MMONELECTIONVICTORY_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionVictory : public Message {
- public:
-  //set<int> active_set;
-
-  MMonElectionVictory(/*set<int>& as*/) : Message(MSG_MON_ELECTION_VICTORY)//,
-       //active_set(as) 
-       {}
-  
-  virtual char *get_type_name() { return "election_victory"; }
-  
-  void encode_payload() {
-    //::_encode(active_set, payload);
-  }
-  void decode_payload() {
-    //int off = 0;
-    //::_decode(active_set, payload, off);
-  }
-};
-
-#endif
index 40cdadc81c57d4cdd81dde69d3a38de0de3de24f..7210b179c9a42521c1035c34f71148b362ed6b38 100644 (file)
 #define __MMONPAXOS_H
 
 #include "msg/Message.h"
+#include "mon/mon_types.h"
 
 class MMonPaxos : public Message {
  public:
   // op types
-  const static int OP_COLLECT = 1;   // proposer: propose round
-  const static int OP_LAST = 2;                 // voter:    accept proposed round
-  const static int OP_BEGIN = 4;        // proposer: value proposed for this round
-  const static int OP_ACCEPT = 5;       // voter:    accept propsed value
-  const static int OP_COMMIT = 7;   // proposer: notify learners of agreed value
+  const static int OP_COLLECT =   1; // proposer: propose round
+  const static int OP_LAST =      2; // voter:    accept proposed round
+  const static int OP_BEGIN =     3; // proposer: value proposed for this round
+  const static int OP_ACCEPT =    4; // voter:    accept propsed value
+  const static int OP_COMMIT =    5; // proposer: notify learners of agreed value
+  const static int OP_LEASE =     6; // leader: extend peon lease
+  const static int OP_LEASE_ACK = 7; // peon: lease ack
   const static char *get_opname(int op) {
     switch (op) {
     case OP_COLLECT: return "collect";
@@ -33,52 +36,61 @@ class MMonPaxos : public Message {
     case OP_BEGIN: return "begin";
     case OP_ACCEPT: return "accept";
     case OP_COMMIT: return "commit";
+    case OP_LEASE: return "lease";
+    case OP_LEASE_ACK: return "lease_ack";
     default: assert(0); return 0;
     }
   }
 
-  // which state machine?
-  int op;   
-  int machine_id;
-  
+  epoch_t epoch;   // monitor epoch
+  int op;          // paxos op
+  int machine_id;  // which state machine?
+
   version_t last_committed;  // i've committed to
   version_t pn_from;         // i promise to accept after
   version_t pn;              // with with proposal
-  version_t old_accepted_pn;     // previous pn, if we are a LAST with an uncommitted value
+  version_t uncommitted_pn;     // previous pn, if we are a LAST with an uncommitted value
+  utime_t lease_expire;
 
   map<version_t,bufferlist> values;
 
   MMonPaxos() : Message(MSG_MON_PAXOS) {}
-  MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
-                             op(o), machine_id(mid),
-                             last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
+  MMonPaxos(epoch_t e, int o, int mid) : 
+    Message(MSG_MON_PAXOS),
+    epoch(e),
+    op(o), machine_id(mid),
+    last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
   
   virtual char *get_type_name() { return "paxos"; }
   
   void print(ostream& out) {
-    out << "paxos(m" << machine_id
+    out << "paxos(" << get_paxos_name(machine_id)
        << " " << get_opname(op) << " lc " << last_committed
-       << " pn " << pn << " opn " << old_accepted_pn
+       << " pn " << pn << " opn " << uncommitted_pn
        << ")";
   }
 
   void encode_payload() {
+    ::_encode(epoch, payload);
     ::_encode(op, payload);
     ::_encode(machine_id, payload);
     ::_encode(last_committed, payload);
     ::_encode(pn_from, payload);
     ::_encode(pn, payload);
-    ::_encode(old_accepted_pn, payload);
+    ::_encode(uncommitted_pn, payload);
+    ::_encode(lease_expire, payload);
     ::_encode(values, payload);
   }
   void decode_payload() {
     int off = 0;
+    ::_decode(epoch, payload, off);
     ::_decode(op, payload, off);
     ::_decode(machine_id, payload, off);
     ::_decode(last_committed, payload, off);
     ::_decode(pn_from, payload, off);   
     ::_decode(pn, payload, off);   
-    ::_decode(old_accepted_pn, payload, off);
+    ::_decode(uncommitted_pn, payload, off);
+    ::_decode(lease_expire, payload, off);
     ::_decode(values, payload, off);
   }
 };
index 9ae5a68d6c59afa8e40674f6ba538546de0594ac..00c94ad1a2a801d93a3602e089ba7f30dc8d0ce7 100644 (file)
 
 class MOSDBoot : public Message {
  public:
+  entity_inst_t inst;
   OSDSuperblock sb;
 
   MOSDBoot() {}
-  MOSDBoot(OSDSuperblock& s) : 
+  MOSDBoot(entity_inst_t i, OSDSuperblock& s) : 
     Message(MSG_OSD_BOOT),
+    inst(i),
     sb(s) {
   }
 
-  char *get_type_name() { return "oboot"; }
+  char *get_type_name() { return "osd_boot"; }
+  void print(ostream& out) {
+    out << "osd_boot(" << inst << ")";
+  }
   
   void encode_payload() {
-    payload.append((char*)&sb, sizeof(sb));
+    ::_encode(inst, payload);
+    ::_encode(sb, payload);
   }
   void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(sb), (char*)&sb);
-    off += sizeof(sb);
+    ::_decode(inst, payload, off);
+    ::_decode(sb, payload, off);
   }
 };
 
index 5bc7a5d5ee9f61a70fc19cd2773071236015fa87..965d622a5f5e22d13f9577500b9c3c695beaed8d 100644 (file)
 
 class MOSDFailure : public Message {
  public:
+  entity_inst_t from;
   entity_inst_t failed;
   epoch_t       epoch;
 
   MOSDFailure() {}
-  MOSDFailure(entity_inst_t f, epoch_t e) : 
+  MOSDFailure(entity_inst_t fr, entity_inst_t f, epoch_t e) : 
     Message(MSG_OSD_FAILURE),
-    failed(f), epoch(e) {}
+    from(fr), failed(f), epoch(e) {}
  
+  entity_inst_t get_from() { return from; }
   entity_inst_t get_failed() { return failed; }
   epoch_t get_epoch() { return epoch; }
 
   void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(failed), (char*)&failed);
-    off += sizeof(failed);
-    payload.copy(off, sizeof(epoch), (char*)&epoch);
-    off += sizeof(epoch);
+    ::_decode(from, payload, off);
+    ::_decode(failed, payload, off);
+    ::_decode(epoch, payload, off);
   }
   void encode_payload() {
-    payload.append((char*)&failed, sizeof(failed));
-    payload.append((char*)&epoch, sizeof(epoch));
+    ::_encode(from, payload);
+    ::_encode(failed, payload);
+    ::_encode(epoch, payload);
   }
 
-  virtual char *get_type_name() { return "osdfail"; }
+  virtual char *get_type_name() { return "osd_failure"; }
+  void print(ostream& out) {
+    out << "osd_failure(" << failed << " e" << epoch << ")";
+  }
 };
 
 #endif
index 5158ce7d3ed83655cb876a2b0d62ca422599143a..68e1b7d137dae1f259ee593d7396a3f077b7b496 100644 (file)
@@ -23,7 +23,6 @@ class MOSDGetMap : public Message {
  public:
   epoch_t since;
 
-  //MOSDGetMap() : since(0) {}
   MOSDGetMap(epoch_t s=0) : 
     Message(MSG_OSD_GETMAP),
     since(s) {
@@ -31,7 +30,10 @@ class MOSDGetMap : public Message {
 
   epoch_t get_since() { return since; }
 
-  char *get_type_name() { return "getomap"; }
+  char *get_type_name() { return "get_osd_map"; }
+  void print(ostream& out) {
+    out << "get_osd_map(since " << since << ")";
+  }
   
   void encode_payload() {
     payload.append((char*)&since, sizeof(since));
index fa8ca2d05c455bc090dea2e1942deb0a9fbdbe58..76a7576ee6066bb840ec83ec19daf74724fb0bea 100644 (file)
@@ -71,17 +71,17 @@ void *fakemessenger_thread(void *ptr)
 {
   lock.Lock();
   while (1) {
+    if (fm_shutdown) break;
+    fakemessenger_do_loop_2();
+    
+    if (directory.empty()) break;
+    
     dout(20) << "thread waiting" << endl;
     if (fm_shutdown) break;
     awake = false;
     cond.Wait(lock);
     awake = true;
     dout(20) << "thread woke up" << endl;
-    if (fm_shutdown) break;
-
-    fakemessenger_do_loop_2();
-
-    if (directory.empty()) break;
   }
   lock.Unlock();
 
@@ -185,7 +185,7 @@ int fakemessenger_do_loop_2()
       }
     }
     
-    // deal with shutdowns.. dleayed to avoid concurrent directory modification
+    // deal with shutdowns.. delayed to avoid concurrent directory modification
     if (!shutdown_set.empty()) {
       for (set<entity_addr_t>::iterator it = shutdown_set.begin();
            it != shutdown_set.end();
@@ -311,7 +311,8 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr
 #endif
 
   // queue
-  if (directory.count(inst.addr)) {
+  if (directory.count(inst.addr) &&
+      shutdown_set.count(inst.addr) == 0) {
     dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl;
     directory[inst.addr]->queue_incoming(m);
   } else {
index 39510c47b9bd3f5dc7a93e98cebda6112a61115a..cfcde208867f0bb694fae10dd7379cfd12705d38 100644 (file)
@@ -15,9 +15,7 @@ using namespace std;
 #include "messages/MMonCommandAck.h"
 #include "messages/MMonPaxos.h"
 
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionPropose.h"
-#include "messages/MMonElectionVictory.h"
+#include "messages/MMonElection.h"
 
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
@@ -122,14 +120,8 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
     m = new MMonPaxos;
     break;
 
-  case MSG_MON_ELECTION_PROPOSE:
-    m = new MMonElectionPropose;
-    break;
-  case MSG_MON_ELECTION_ACK:
-    m = new MMonElectionAck;
-    break;
-  case MSG_MON_ELECTION_VICTORY:
-    m = new MMonElectionVictory;
+  case MSG_MON_ELECTION:
+    m = new MMonElection;
     break;
 
   case MSG_PING:
index c7e76297553ed19362a2b4a1384c0cbc1c11638e..8d43684e66e77cc07720be0fa2d7f4033d02c227 100644 (file)
@@ -26,9 +26,7 @@
 #define MSG_MON_COMMAND_ACK        14
 
 
-#define MSG_MON_ELECTION_ACK       15
-#define MSG_MON_ELECTION_PROPOSE   16
-#define MSG_MON_ELECTION_VICTORY   17
+#define MSG_MON_ELECTION           15
 
 #define MSG_MON_OSDMAP_INFO            20
 #define MSG_MON_OSDMAP_LEASE           21
index 4f9d001a3c459fd29911ffea09374bce7e6c3939..355855e2afc91c72ec2b17e2ac77ec3bc85c112e 100644 (file)
@@ -281,7 +281,7 @@ int OSD::init()
     
     // announce to monitor i exist and have booted.
     int mon = monmap->pick_mon();
-    messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon));
+    messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon));
     
     // start the heart
     timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
@@ -898,7 +898,7 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
             << ", dropping and reporting to mon" << mon 
            << " " << *m
             << dendl;
-    messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
+    messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()),
                             monmap->get_inst(mon));
     delete m;
   } else if (dest.is_mon()) {
@@ -1166,7 +1166,6 @@ void OSD::advance_map(ObjectStore::Transaction& t)
     int maxraid = g_conf.osd_max_raid_width;
     dout(1) << "mkfs    " << minrep << ".." << maxrep << " replicas, " 
            << minraid << ".." << maxraid << " osd raid groups" << dendl;
-    assert(osdmap->get_epoch() == 1);
 
     //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << dendl;
     logger->set_start( osdmap->get_ctime() );
@@ -1490,7 +1489,7 @@ void OSD::get_map(epoch_t epoch, OSDMap &m)
       incs.push_front(inc);
     }
   }
-  assert(e > 0);
+  assert(e >= 0);
 
   // apply incrementals
   for (e++; e <= epoch; e++) {
index f8c5b3ebdd036423e9baf94eabb9947bf9fa0adb..6086ea09daae061874d8f90b1b66890e4e8073b8 100644 (file)
@@ -87,6 +87,11 @@ public:
     epoch_t epoch;   // new epoch; we are a diff from epoch-1 to epoch
     epoch_t mon_epoch;  // monitor epoch (election iteration)
     utime_t ctime;
+
+    // full (rare)
+    bufferlist fullmap;  // in leiu of below.
+
+    // incremental
     map<int,entity_inst_t> new_up;
     map<int,entity_inst_t> new_down;
     list<int> new_in;
@@ -103,6 +108,7 @@ public:
       ::_encode(new_in, bl);
       ::_encode(new_out, bl);
       ::_encode(new_overload, bl);
+      ::_encode(fullmap, bl);
     }
     void decode(bufferlist& bl, int& off) {
       bl.copy(off, sizeof(epoch), (char*)&epoch);
@@ -116,6 +122,7 @@ public:
       ::_decode(new_in, bl, off);
       ::_decode(new_out, bl, off);
       ::_decode(new_overload, bl, off);
+      ::_decode(fullmap, bl, off);
     }
 
     Incremental(epoch_t e=0) : epoch(e), mon_epoch(0) {}
@@ -164,8 +171,8 @@ private:
 
   const utime_t& get_ctime() const { return ctime; }
 
-  bool is_mkfs() const { return epoch == 1; }
-  //void set_mkfs() { assert(epoch == 1); }
+  bool is_mkfs() const { return epoch == 2; }
+  bool post_mkfs() const { return epoch > 2; }
 
   /***** cluster state *****/
   int num_osds() { return osds.size(); }
@@ -176,11 +183,15 @@ private:
   const set<int>& get_out_osds() { return out_osds; }
   const map<int,float>& get_overload_osds() { return overload_osds; }
   
+  bool exists(int osd) { return osds.count(osd); }
   bool is_down(int osd) { return down_osds.count(osd); }
-  bool is_up(int osd) { return !is_down(osd); }
+  bool is_up(int osd) { return exists(osd) && !is_down(osd); }
   bool is_out(int osd) { return out_osds.count(osd); }
-  bool is_in(int osd) { return !is_out(osd); }
+  bool is_in(int osd) { return exists(osd) && !is_out(osd); }
   
+  bool have_inst(int osd) {
+    return osd_inst.count(osd);
+  }
   const entity_inst_t& get_inst(int osd) {
     assert(osd_inst.count(osd));
     return osd_inst[osd];
@@ -205,15 +216,13 @@ private:
     mon_epoch = inc.mon_epoch;
     ctime = inc.ctime;
 
-    for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
-         i != inc.new_up.end(); 
-         i++) {
-      assert(down_osds.count(i->first));
-      down_osds.erase(i->first);
-      assert(osd_inst.count(i->first) == 0);
-      osd_inst[i->first] = i->second;
-      //cout << "epoch " << epoch << " up osd" << i->first << endl;
+    // full map?
+    if (inc.fullmap.length()) {
+      decode(inc.fullmap);
+      return;
     }
+
+    // nope, incremental.
     for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
          i != inc.new_down.end();
          i++) {
@@ -224,13 +233,6 @@ private:
       osd_inst.erase(i->first);
       //cout << "epoch " << epoch << " down osd" << i->first << endl;
     }
-    for (list<int>::iterator i = inc.new_in.begin();
-         i != inc.new_in.end();
-         i++) {
-      assert(out_osds.count(*i));
-      out_osds.erase(*i);
-      //cout << "epoch " << epoch << " in osd" << *i << endl;
-    }
     for (list<int>::iterator i = inc.new_out.begin();
          i != inc.new_out.end();
          i++) {
@@ -238,17 +240,34 @@ private:
       out_osds.insert(*i);
       //cout << "epoch " << epoch << " out osd" << *i << endl;
     }
-    for (map<int,float>::iterator i = inc.new_overload.begin();
-         i != inc.new_overload.end();
-         i++) {
-      overload_osds[i->first] = i->second;
-    }
     for (list<int>::iterator i = inc.old_overload.begin();
          i != inc.old_overload.end();
          i++) {
       assert(overload_osds.count(*i));
       overload_osds.erase(*i);
     }
+
+    for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
+         i != inc.new_up.end(); 
+         i++) {
+      assert(down_osds.count(i->first));
+      down_osds.erase(i->first);
+      assert(osd_inst.count(i->first) == 0);
+      osd_inst[i->first] = i->second;
+      //cout << "epoch " << epoch << " up osd" << i->first << endl;
+    }
+    for (list<int>::iterator i = inc.new_in.begin();
+         i != inc.new_in.end();
+         i++) {
+      assert(out_osds.count(*i));
+      out_osds.erase(*i);
+      //cout << "epoch " << epoch << " in osd" << *i << endl;
+    }
+    for (map<int,float>::iterator i = inc.new_overload.begin();
+         i != inc.new_overload.end();
+         i++) {
+      overload_osds[i->first] = i->second;
+    }
   }
 
   // serialize, unserialize
index d97652b5487781c66d51991faca128fd65cd936e..b21cbd26cffc29ed48e7202fddc13ec6bf067e24 100644 (file)
@@ -102,14 +102,29 @@ public:
     list<off_t>    offsets;
     list<size_t>   lengths;
     list<const char*> attrnames;
+    list<string> attrnames2;
     //list< pair<const void*,int> > attrvals;
     list<bufferlist>  attrbls;
 
+    // for reads only (not encoded)
     list<bufferlist*> pbls;
     list<struct stat*> psts;
     list< pair<void*,int*> > pattrvals;
     list< map<string,bufferptr>* > pattrsets;
 
+    const char *get_attrname() {
+      if (attrnames.empty()) 
+       return attrnames2.front().c_str();
+      else
+       return attrnames.front();
+    }
+    void pop_attrname() {
+      if (attrnames.empty()) 
+       attrnames2.pop_front();
+      else
+       attrnames.pop_front();
+    }
+
     void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
       int op = OP_READ;
       ops.push_back(op);
@@ -232,6 +247,27 @@ public:
     }
 
     // etc.
+
+    void _encode(bufferlist& bl) {
+      ::_encode(ops, bl);
+      ::_encode(bls, bl);
+      ::_encode(oids, bl);
+      ::_encode(cids, bl);
+      ::_encode(offsets, bl);
+      ::_encode(lengths, bl);
+      ::_encode(attrnames, bl);
+      ::_encode(attrbls, bl);
+    }
+    void _decode(bufferlist& bl, int& off) {
+      ::_decode(ops, bl, off);
+      ::_decode(bls, bl, off);
+      ::_decode(oids, bl, off);
+      ::_decode(cids, bl, off);
+      ::_decode(offsets, bl, off);
+      ::_decode(lengths, bl, off);
+      ::_decode(attrnames2, bl, off);
+      ::_decode(attrbls, bl, off);      
+    }
   };
 
 
@@ -264,7 +300,7 @@ public:
       case Transaction::OP_GETATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
           *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
         }
@@ -314,7 +350,7 @@ public:
       case Transaction::OP_SETATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
           bufferlist bl;
           bl.claim( t.attrbls.front() );
@@ -333,7 +369,7 @@ public:
       case Transaction::OP_RMATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           rmattr(oid, attrname, 0);
         }
         break;
@@ -379,7 +415,7 @@ public:
       case Transaction::OP_COLL_SETATTR:
         {
           coll_t cid = t.cids.front(); t.cids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
           bufferlist bl;
           bl.claim( t.attrbls.front() );
@@ -391,7 +427,7 @@ public:
       case Transaction::OP_COLL_RMATTR:
         {
           coll_t cid = t.cids.front(); t.cids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           collection_rmattr(cid, attrname, 0);
         }
         break;
index 2592bd9ca69f8cf07acc017990b33743d7268f6a..c2d1290102e8b1817383230f4d213be383dfc104 100644 (file)
@@ -650,7 +650,7 @@ void PG::peer(ObjectStore::Transaction& t,
     } else {
       dout(10) << " still active from last started: " << last_started << dendl;
     }
-  } else if (osd->osdmap->get_epoch() > 1) {
+  } else if (osd->osdmap->post_mkfs()) {
     dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
     state_set(STATE_CRASHED);
   }    
@@ -876,7 +876,7 @@ void PG::activate(ObjectStore::Transaction& t)
 
   // if primary..
   if (role == 0 &&
-      osd->osdmap->get_epoch() > 1) {
+      osd->osdmap->post_mkfs()) {
     // who is clean?
     clean_set.clear();
     if (info.is_clean()) 
index 6039c903271401a2f1f563b0079bec38bcfef4e2..64d2374b5bd9980a240673c7523e1a443b2d6193 100644 (file)
@@ -841,7 +841,7 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in
     dout(0) << "ms_handle_failure " << dest << " inst " << inst 
             << ", dropping and reporting to mon" << mon 
             << endl;
-    messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), 
+    messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()), 
                             monmap->get_inst(mon));
     delete m;
   } else {