]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged trunk changes r1424:1461 into branches/sage/mon2
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 1 Jul 2007 14:35:29 +0000 (14:35 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 1 Jul 2007 14:35:29 +0000 (14:35 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1462 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/mon2/Makefile
branches/sage/mon2/ebofs/Ebofs.cc
branches/sage/mon2/ebofs/Ebofs.h
branches/sage/mon2/ebofs/FileJournal.cc
branches/sage/mon2/ebofs/FileJournal.h
branches/sage/mon2/ebofs/Journal.h
branches/sage/mon2/ebofs/test.ebofs.cc
branches/sage/mon2/ebofs/types.h
branches/sage/mon2/include/buffer.h
branches/sage/mon2/osd/OSD.cc
branches/sage/mon2/osd/ObjectStore.h

index 96c7a57d55227f0ea03e0cf45802aac4e8818368..5a9a77437fb8e365ea13f07b8f938bf02e1605ec 100644 (file)
@@ -38,7 +38,8 @@ EBOFS_OBJS= \
        ebofs/BlockDevice.o\
        ebofs/BufferCache.o\
        ebofs/Ebofs.o\
-       ebofs/Allocator.o
+       ebofs/Allocator.o\
+       ebofs/FileJournal.o
 
 MDS_OBJS= \
        mds/MDS.o\
index a7c192b3901005e12ef1178f95863456b6cf0363..f315d0385016f385f0771bcae4909eae2eda45da 100644 (file)
@@ -16,6 +16,8 @@
 
 #include "Ebofs.h"
 
+#include "FileJournal.h"
+
 #include <errno.h>
 
 #ifndef DARWIN
@@ -50,6 +52,7 @@ int Ebofs::mount()
   ebofs_lock.Lock();
   assert(!mounted);
 
+  // open dev
   int r = dev.open(&idle_kicker);
   if (r < 0) {
     ebofs_lock.Unlock();
@@ -79,6 +82,8 @@ int Ebofs::mount()
   dout(3) << "mount epoch " << super_epoch << endl;
   assert(super_epoch == sb->epoch);
 
+  super_fsid = sb->fsid;
+
   free_blocks = sb->free_blocks;
   limbo_blocks = sb->limbo_blocks;
 
@@ -101,6 +106,43 @@ int Ebofs::mount()
 
   allocator.release_limbo();
 
+  
+  // open journal?
+  if (journalfn) {
+    journal = new FileJournal(this, journalfn);
+    if (journal->open() < 0) {
+      dout(-3) << "mount journal " << journalfn << " open failed" << endl;
+      delete journal;
+      journal = 0;
+    } else {
+      dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
+      
+      while (1) {
+       bufferlist bl;
+       epoch_t e;
+       if (!journal->read_entry(bl, e)) {
+         dout(-3) << "mount replay: end of journal, done." << endl;
+         break;
+       }
+
+       if (e < super_epoch) {
+         dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
+       }
+       if (e == super_epoch+1) {
+         super_epoch++;
+         dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
+       }
+       assert(e == super_epoch);
+       
+       dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
+       Transaction t;
+       int off = 0;
+       t._decode(bl, off);
+       _apply_transaction(t);
+      }
+    }
+  }
+
   dout(3) << "mount starting commit+finisher threads" << endl;
   commit_thread.create();
   finisher_thread.create();
@@ -108,6 +150,7 @@ int Ebofs::mount()
   dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
   mounted = true;
 
+
   ebofs_lock.Unlock();
   return 0;
 }
@@ -126,6 +169,10 @@ int Ebofs::mkfs()
 
   block_t num_blocks = dev.get_num_blocks();
 
+  // make a super-random fsid
+  srand(time(0) ^ getpid());
+  super_fsid = (lrand48() << 32) ^ mrand48();
+
   free_blocks = 0;
   limbo_blocks = 0;
 
@@ -197,6 +244,18 @@ int Ebofs::mkfs()
 
   dev.close();
 
+
+  // create journal?
+  if (journalfn) {
+    journal = new FileJournal(this, journalfn);
+    if (journal->create() < 0) {
+      dout(3) << "mount journal " << journalfn << " created failed" << endl;
+      delete journal;
+    } else {
+      dout(3) << "mount journal " << journalfn << " created" << endl;
+    }
+  }
+
   dout(2) << "mkfs: " << dev.get_device_name() << " "  << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
   ebofs_lock.Unlock();
   return 0;
@@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp)
   // fill in super
   memset(&sb, 0, sizeof(sb));
   sb.s_magic = EBOFS_MAGIC;
+  sb.fsid = super_fsid;
   sb.epoch = epoch;
   sb.num_blocks = dev.get_num_blocks();
 
@@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry()
               << ", max dirty " << g_conf.ebofs_bc_max_dirty
               << endl;
       
+      if (journal) journal->commit_epoch_start();
       
       // (async) write onodes+condes  (do this first; it currently involves inode reallocation)
       commit_inodes_start();
@@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry()
         alloc_more_node_space();
       }
       
+      // signal journal
+      if (journal) journal->commit_epoch_finish();
+
       // kick waiters
       dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
       
-      finisher_lock.Lock();
-      finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
+      queue_finishers(commit_waiters[super_epoch-1]);
       commit_waiters.erase(super_epoch-1);
-      finisher_cond.Signal();
-      finisher_lock.Unlock();
 
       sync_cond.Signal();
 
@@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe)
   ebofs_lock.Lock();
   if (onsafe) {
     dirty = true;
-    commit_waiters[super_epoch].push_back(onsafe);
+
+    while (1) {
+      if (journal) {  
+       // journal empty transaction
+       Transaction t;
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   }
   ebofs_lock.Unlock();
 }
@@ -1994,6 +2066,29 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
   ebofs_lock.Lock();
   dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
 
+  unsigned r = _apply_transaction(t);
+
+  // journal, wait for commit
+  if (r != 0 && onsafe) {
+    delete onsafe;  // kill callback, but still journal below (in case transaction had side effects)
+    onsafe = 0;
+  }
+  while (1) {
+    if (journal) {
+      bufferlist bl;
+      t._encode(bl);
+      if (journal->submit_entry(bl, onsafe)) break; 
+    }
+    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    break;
+  }
+
+  ebofs_lock.Unlock();
+  return r;
+}
+
+unsigned Ebofs::_apply_transaction(Transaction& t)
+{
   // do ops
   unsigned r = 0;  // bit fields indicate which ops failed.
   int bit = 1;
@@ -2028,7 +2123,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_GETATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
         if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
           dout(7) << "apply_transaction fail on _getattr" << endl;
@@ -2095,7 +2190,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_SETATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
         bufferlist bl;
         bl.claim( t.attrbls.front() );
@@ -2121,7 +2216,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_RMATTR:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         if (_rmattr(oid, attrname) < 0) {
           dout(7) << "apply_transaction fail on _rmattr" << endl;
           r &= bit;
@@ -2185,7 +2280,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_COLL_SETATTR:
       {
         coll_t cid = t.cids.front(); t.cids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
         bufferlist bl;
         bl.claim( t.attrbls.front() );
@@ -2201,7 +2296,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     case Transaction::OP_COLL_RMATTR:
       {
         coll_t cid = t.cids.front(); t.cids.pop_front();
-        const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+       const char *attrname = t.get_attrname(); t.pop_attrname();
         if (_collection_rmattr(cid, attrname) < 0) {
           dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
           r &= bit;
@@ -2217,16 +2312,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
     bit = bit << 1;
   }
   
-  dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
-  
-  // set up commit waiter
-  //if (r == 0) {
-  if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
-  //} else {
-  //if (onsafe) delete onsafe;
-  //}
-  
-  ebofs_lock.Unlock();
+  dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
   return r;
 }
 
@@ -2295,36 +2381,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl)
 }
 
 
-/*int Ebofs::write(object_t oid, 
-                 off_t off, size_t len,
-                 bufferlist& bl, bool fsync)
-{
-  // wait?
-  if (fsync) {
-    // wait for flush.
-    Cond cond;
-    bool done;
-    int flush = 1;    // write never returns positive
-    Context *c = new C_Cond(&cond, &done, &flush);
-    int r = write(oid, off, len, bl, c);
-    if (r < 0) return r;
-    
-    ebofs_lock.Lock();
-    {
-      while (!done) 
-        cond.Wait(ebofs_lock);
-      assert(flush <= 0);
-    }
-    ebofs_lock.Unlock();
-    if (flush < 0) return flush;
-    return r;
-  } else {
-    // don't wait for flush.
-    return write(oid, off, len, bl, (Context*)0);
-  }
-}
-*/
-
 int Ebofs::write(object_t oid, 
                  off_t off, size_t len,
                  bufferlist& bl, Context *onsafe)
@@ -2338,7 +2394,17 @@ int Ebofs::write(object_t oid,
   // commit waiter
   if (r > 0) {
     assert((size_t)r == len);
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.write(oid, off, len, bl);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2370,9 +2436,19 @@ int Ebofs::remove(object_t oid, Context *onsafe)
   // do it
   int r = _remove(oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.remove(oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2445,9 +2521,19 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
   
   int r = _truncate(oid, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.truncate(oid, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2464,9 +2550,19 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe)
   
   int r = _clone(from, to);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.clone(from, to);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2640,9 +2736,19 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz
   ebofs_lock.Lock();
   int r = _setattr(oid, name, value, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.setattr(oid, name, value, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2673,9 +2779,19 @@ int Ebofs::setattrs(object_t oid, map<string,bufferptr>& attrset, Context *onsaf
   ebofs_lock.Lock();
   int r = _setattrs(oid, attrset);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.setattrs(oid, attrset);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2758,9 +2874,19 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe)
 
   int r = _rmattr(oid, name);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.rmattr(oid, name);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2835,9 +2961,19 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe)
 
   int r = _create_collection(cid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.create_collection(cid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2882,9 +3018,19 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
 
   int r = _destroy_collection(cid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.remove_collection(cid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2936,9 +3082,19 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe)
 
   int r = _collection_add(cid, oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_add(cid, oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -2977,9 +3133,19 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe)
 
   int r = _collection_remove(cid, oid);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_remove(cid, oid);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3040,9 +3206,19 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s
 
   int r = _collection_setattr(cid, name, value, size);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_setattr(cid, name, value, size);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
@@ -3098,9 +3274,19 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
 
   int r = _collection_rmattr(cid, name);
 
-  // set up commit waiter
+  // journal, wait for commit
   if (r >= 0) {
-    if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+    while (1) {
+      if (journal) {
+       Transaction t;
+       t.collection_rmattr(cid, name);
+       bufferlist bl;
+       t._encode(bl);
+       if (journal->submit_entry(bl, onsafe)) break;
+      }
+      if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+      break;
+    }
   } else {
     if (onsafe) delete onsafe;
   }
index 91c5eb51b3cda6afb7837039120eb30bdff9e0ee..eb20cf89205316e5476a4a76577951a317dd3b53 100644 (file)
@@ -29,6 +29,7 @@ using namespace __gnu_cxx;
 #include "nodes.h"
 #include "Allocator.h"
 #include "Table.h"
+#include "Journal.h"
 
 #include "common/Mutex.h"
 #include "common/Cond.h"
@@ -40,20 +41,23 @@ typedef pair<coll_t,object_t> coll_object_t;
 
 
 class Ebofs : public ObjectStore {
- protected:
+protected:
   Mutex        ebofs_lock;    // a beautiful global lock
 
   // ** debuggy **
   bool         fake_writes;
 
   // ** super **
+public:
   BlockDevice  dev;
+protected:
   bool         mounted, unmounting, dirty;
   bool         readonly;
   version_t    super_epoch;
   bool         commit_thread_started, mid_commit;
   Cond         commit_cond;   // to wake up the commit thread
   Cond         sync_cond;
+  uint64_t     super_fsid;
 
   map<version_t, list<Context*> > commit_waiters;
 
@@ -71,9 +75,16 @@ class Ebofs : public ObjectStore {
     }
   } commit_thread;
 
-  
+public:
+  uint64_t get_fsid() { return super_fsid; }
+  epoch_t get_super_epoch() { return super_epoch; }
+protected:
 
 
+  // ** journal **
+  char *journalfn;
+  Journal *journal;
+
   // ** allocator **
   block_t      free_blocks, limbo_blocks;
   Allocator    allocator;
@@ -188,6 +199,21 @@ class Ebofs : public ObjectStore {
   bool           finisher_stop;
   list<Context*> finisher_queue;
 
+public:
+  void queue_finisher(Context *c) {
+    finisher_lock.Lock();
+    finisher_queue.push_back(c);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+  void queue_finishers(list<Context*>& ls) {
+    finisher_lock.Lock();
+    finisher_queue.splice(finisher_queue.end(), ls);
+    finisher_cond.Signal();
+    finisher_lock.Unlock();
+  }
+protected:
+
   void *finisher_thread_entry();
   class FinisherThread : public Thread {
     Ebofs *ebofs;
@@ -204,12 +230,13 @@ class Ebofs : public ObjectStore {
 
 
  public:
-  Ebofs(char *devfn) : 
+  Ebofs(char *devfn, char *jfn=0) : 
     fake_writes(false),
     dev(devfn), 
     mounted(false), unmounting(false), dirty(false), readonly(false), 
     super_epoch(0), commit_thread_started(false), mid_commit(false),
     commit_thread(this),
+    journalfn(jfn), journal(0),
     free_blocks(0), limbo_blocks(0),
     allocator(this),
     nodepool(ebofs_lock),
@@ -222,6 +249,11 @@ class Ebofs : public ObjectStore {
     finisher_stop(false), finisher_thread(this) {
     for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
       free_tab[i] = 0;
+    if (!journalfn) {
+      journalfn = new char[strlen(devfn) + 100];
+      strcpy(journalfn, devfn);
+      strcat(journalfn, ".journal");
+    }
   }
   ~Ebofs() {
   }
@@ -298,6 +330,8 @@ class Ebofs : public ObjectStore {
 
 private:
   // private interface -- use if caller already holds lock
+  unsigned _apply_transaction(Transaction& t);
+
   int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
   int _is_cached(object_t oid, off_t off, size_t len);
   int _stat(object_t oid, struct stat *st);
index 87ea20199cd24625eed31811a0160f5b103efa2d..74edecf41c71a023ebc5b36785e3bc80f2a3e041 100644 (file)
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "FileJournal.h"
 #include "Ebofs.h"
 
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
 
+#include "config.h"
+#undef dout
+#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
 
 
-void FileJournal::create()
+int FileJournal::create()
 {
   dout(1) << "create " << fn << endl;
 
   // open/create
-  fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+  if (fd < 0) {
+    dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
+    return -errno;
+  }
   assert(fd > 0);
 
-  ::ftruncate(fd);
-  ::fchmod(fd, 0644);
+  //::ftruncate(fd, 0);
+  //::fchmod(fd, 0644);
+
+  // get size
+  struct stat st;
+  ::fstat(fd, &st);
+  dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
+
+  // write empty header
+  header.clear();
+  header.fsid = ebofs->get_fsid();
+  header.max_size = st.st_size;
+  write_header();
+  
+  // writeable.
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
 
   ::close(fd);
-}
 
+  return 0;
+}
 
-void FileJournal::open()
+int FileJournal::open()
 {
-  dout(1) << "open " << fn << endl;
+  //dout(1) << "open " << fn << endl;
 
   // open and file
   assert(fd == 0);
-  fd = ::open(fn.c_str(), O_RDWR);
+  fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+  if (fd < 0) {
+    dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
+    return -errno;
+  }
   assert(fd > 0);
 
-  // read header?
-  // ***
+  // assume writeable, unless...
+  read_pos = 0;
+  write_pos = queue_pos = sizeof(header);
 
+  // read header?
+  read_header();
+  if (header.num > 0 && header.fsid == ebofs->get_fsid()) {
+    // valid header, pick an offset
+    for (int i=0; i<header.num; i++) {
+      if (header.epoch[i] == ebofs->get_super_epoch()) {
+       dout(2) << "using read_pos header pointer "
+               << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+       read_pos = header.offset[i];
+       write_pos = queue_pos = 0;
+       break;
+      }      
+      else if (header.epoch[i] < ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+      }
+      else if (header.epoch[i] > ebofs->get_super_epoch()) {
+       dout(2) << "super_epoch is " << ebofs->get_super_epoch() 
+               << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+               << endl;
+       break;
+      }
+    }
+  }
 
   start_writer();
+
+  return 0;
 }
 
 void FileJournal::close()
@@ -49,7 +119,8 @@ void FileJournal::close()
   stop_writer();
 
   // close
-  assert(q.empty());
+  assert(writeq.empty());
+  assert(commitq.empty());
   assert(fd > 0);
   ::close(fd);
   fd = 0;
@@ -73,12 +144,36 @@ void FileJournal::stop_writer()
 }
 
 
+void FileJournal::print_header()
+{
+  for (int i=0; i<header.num; i++) {
+    if (i && header.offset[i] < header.offset[i-1]) {
+      assert(header.wrap);
+      dout(10) << "header: wrap at " << header.wrap << endl;
+    }
+    dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
+  }
+  //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
+}
+void FileJournal::read_header()
+{
+  dout(10) << "read_header" << endl;
+  memset(&header, 0, sizeof(header));  // zero out (read may fail)
+  ::lseek(fd, 0, SEEK_SET);
+  int r = ::read(fd, &header, sizeof(header));
+  if (r < 0) 
+    dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
+  print_header();
+}
 void FileJournal::write_header()
 {
-  dout(10) << "write_header" << endl;
-  
+  dout(10) << "write_header " << endl;
+  print_header();
+
   ::lseek(fd, 0, SEEK_SET);
-  ::write(fd, &header, sizeof(header));
+  int r = ::write(fd, &header, sizeof(header));
+  if (r < 0) 
+    dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
 }
 
 
@@ -91,6 +186,7 @@ void FileJournal::write_thread_entry()
     if (writeq.empty()) {
       // sleep
       dout(20) << "write_thread_entry going to sleep" << endl;
+      assert(write_pos == queue_pos);
       write_cond.Wait(write_lock);
       dout(20) << "write_thread_entry woke up" << endl;
       continue;
@@ -99,24 +195,35 @@ void FileJournal::write_thread_entry()
     // do queued writes
     while (!writeq.empty()) {
       // grab next item
-      epoch_t e = writeq.front().first;
+      epoch_t epoch = writeq.front().first;
       bufferlist bl;
       bl.claim(writeq.front().second);
       writeq.pop_front();
       Context *oncommit = commitq.front();
       commitq.pop_front();
       
-      dout(15) << "write_thread_entry writing " << bottom << " : " 
+      // wrap?
+      if (write_pos == header.wrap) {
+       dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
+       assert(header.wrap == write_pos);
+       write_header();
+       write_pos = sizeof(header_t);
+      }
+
+      // write!
+      dout(15) << "write_thread_entry writing " << write_pos << " : " 
               << bl.length() 
-              << " epoch " << e
+              << " epoch " << epoch
               << endl;
       
-      // write epoch, len, data.
-      ::fseek(fd, bottom, SEEK_SET);
-      ::write(fd, &e, sizeof(e));
-      
-      uint32_t len = bl.length();
-      ::write(fd, &len, sizeof(len));
+      // write entry header
+      entry_header_t h;
+      h.epoch = epoch;
+      h.len = bl.length();
+      h.make_magic(write_pos, header.fsid);
+
+      ::lseek(fd, write_pos, SEEK_SET);
+      ::write(fd, &h, sizeof(h));
       
       for (list<bufferptr>::const_iterator it = bl.buffers().begin();
           it != bl.buffers().end();
@@ -124,14 +231,21 @@ void FileJournal::write_thread_entry()
        if ((*it).length() == 0) continue;  // blank buffer.
        ::write(fd, (char*)(*it).c_str(), (*it).length() );
       }
+
+      ::write(fd, &h, sizeof(h));
       
       // move position pointer
-      bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+      write_pos += 2*sizeof(entry_header_t) + bl.length();
       
-      // do commit callback
       if (oncommit) {
-       oncommit->finish(0);
-       delete oncommit;
+       if (1) {
+         // queue callback
+         ebofs->queue_finisher(oncommit);
+       } else {
+         // callback now
+         oncommit->finish(0);
+         delete oncommit;
+       }
       }
     }
   }
@@ -140,61 +254,202 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry finish" << endl;
 }
 
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
 {
-  dout(10) << "submit_entry " << bottom << " : " << e.length()
-          << " epoch " << ebofs->super_epoch
+  assert(queue_pos != 0);  // bad create(), or journal didn't replay to completion.
+
+  // ** lock **
+  Mutex::Locker locker(write_lock);
+
+  // wrap? full?
+  off_t size = 2*sizeof(entry_header_t) + e.length();
+
+  if (full) return false;  // already marked full.
+
+  if (header.wrap) {
+    // we're wrapped.  don't overwrite ourselves.
+    if (queue_pos + size >= header.offset[0]) {
+      dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+              << " >= " << header.offset[0]
+              << endl;
+      full = true;
+      print_header();
+      return false;      
+    }
+  } else {
+    // we haven't wrapped.  
+    if (queue_pos + size >= header.max_size) {
+      // is there room if we wrap?
+      if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+       // yes!
+       dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
+       header.wrap = queue_pos;
+       queue_pos = sizeof(header_t);
+       header.push(ebofs->get_super_epoch(), queue_pos);
+      } else {
+       // no room.
+       dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+                << " >= " << header.max_size
+                << endl;
+       full = true;
+       return false;
+      }
+    }
+  }
+  
+  dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+          << " epoch " << ebofs->get_super_epoch()
           << " " << oncommit << endl;
   
   // dump on queue
-  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+  writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
   commitq.push_back(oncommit);
-
+  
+  queue_pos += size;
+  
   // kick writer thread
   write_cond.Signal();
+
+  return true;
 }
 
 
 void FileJournal::commit_epoch_start()
 {
-  dout(10) << "commit_epoch_start" << endl;
+  dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 
+          << " -- new epoch " << ebofs->get_super_epoch()
+          << endl;
 
-  write_lock.Lock();
-  {
-    header.epoch2 = ebofs->super_epoch;
-    header.top2 = bottom;
-    write_header();
-  }
-  write_lock.Unlock();
+  Mutex::Locker locker(write_lock);
+
+  // was full -> empty -> now usable?
+  if (full) {
+    if (header.num != 0) {
+      dout(1) << " journal FULL, ignoring this epoch" << endl;
+      return;
+    }
+    
+    dout(1) << " clearing FULL flag, journal now usable" << endl;
+    full = false;
+  } 
+
+  // note epoch boundary
+  header.push(ebofs->get_super_epoch(), queue_pos);  // note: these entries may not yet be written.
+  //write_header();  // no need to write it now, though...
 }
 
 void FileJournal::commit_epoch_finish()
 {
-  dout(10) << "commit_epoch_finish" << endl;
+  dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
 
   write_lock.Lock();
   {
-    // update header
-    header.epoch1 = ebofs->super_epoch;
-    header.top1 = header.top2;
-    header.epoch2 = 0;
-    header.top2 = 0;
+    if (full) {
+      // full journal damage control.
+      dout(15) << " journal was FULL, contents now committed, clearing header.  journal still not usable until next epoch." << endl;
+      header.clear();
+      write_pos = queue_pos = sizeof(header_t);
+    } else {
+      // update header -- trim/discard old (committed) epochs
+      while (header.epoch[0] < ebofs->get_super_epoch())
+       header.pop();
+    }
     write_header();
 
-    // flush any unwritten items in previous epoch
-    while (!writeq.empty() &&
-          writeq.front().first < ebofs->super_epoch) {
-      dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
-      writeq.pop_front();
+    // discard any unwritten items in previous epoch, and do callbacks
+    epoch_t epoch = ebofs->get_super_epoch();
+    list<Context*> callbacks;
+    while (!writeq.empty() && writeq.front().first < epoch) {
+      dout(15) << " dropping unwritten and committed " 
+              << write_pos << " : " << writeq.front().second.length()
+              << " epoch " << writeq.front().first 
+              << endl;
+      // finisher?
       Context *oncommit = commitq.front();
+      if (oncommit) callbacks.push_back(oncommit);
+
+      write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+      // discard.
+      writeq.pop_front();  
       commitq.pop_front();
-         
-      if (oncommit) {
-       oncommit->finish(0);
-       delete oncommit;
-      }
     }
+    
+    // queue the finishers
+    ebofs->queue_finishers(callbacks);
   }
   write_lock.Unlock();
   
 }
+
+
+void FileJournal::make_writeable()
+{
+  if (read_pos)
+    write_pos = queue_pos = read_pos;
+  else
+    write_pos = queue_pos = sizeof(header_t);
+  read_pos = 0;
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+  if (!read_pos) {
+    dout(1) << "read_entry -- not readable" << endl;
+    make_writeable();
+    return false;
+  }
+
+  if (read_pos == header.wrap) {
+    // find wrap point
+    for (int i=1; i<header.num; i++) {
+      if (header.offset[i] < read_pos) {
+       assert(header.offset[i-1] < read_pos);
+       read_pos = header.offset[i];
+       break;
+      }
+    }
+    assert(read_pos != header.wrap);
+    dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
+  }
+
+  // header
+  entry_header_t h;
+  ::lseek(fd, read_pos, SEEK_SET);
+  ::read(fd, &h, sizeof(h));
+  if (!h.check_magic(read_pos, header.fsid)) {
+    dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
+    make_writeable();
+    return false;
+  }
+
+  // body
+  bufferptr bp(h.len);
+  ::read(fd, bp.c_str(), h.len);
+
+  // footer
+  entry_header_t f;
+  ::read(fd, &f, sizeof(h));
+  if (!f.check_magic(read_pos, header.fsid) ||
+      h.epoch != f.epoch ||
+      h.len != f.len) {
+    dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
+    make_writeable();
+    return false;
+  }
+
+
+  // yay!
+  dout(1) << "read_entry " << read_pos << " : " 
+         << " " << h.len << " bytes"
+         << " epoch " << h.epoch 
+         << endl;
+  
+  bl.push_back(bp);
+  epoch = h.epoch;
+
+  read_pos += 2*sizeof(entry_header_t) + h.len;
+
+  return true;
+}
index 6c34f24f339559a425df42d7036f8803060f9454..a26f75ec97ff6b6c390cd3c3b6c82067b4da29a0 100644 (file)
 
 
 #include "Journal.h"
-
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
 
 class FileJournal : public Journal {
 public:
+  /** log header
+   * we allow 3 pointers:
+   *  top/initial,
+   *  one for an epoch boundary,
+   *  and one for a wrap in the ring buffer/journal file.
+   * the epoch boundary one is useful only for speedier recovery in certain cases
+   * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+   */
   struct header_t {
-    epoch_t epoch1;
-    off_t top1;
-    epoch_t epoch2;
-    off_t top2;
+    uint64_t fsid;
+    int num;
+    off_t wrap;
+    off_t max_size;
+    epoch_t epoch[3];
+    off_t offset[3];
+
+    header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+    void clear() {
+      num = 0;
+      wrap = 0;
+    }
+    void pop() {
+      if (num >= 2 && offset[0] > offset[1]) 
+       wrap = 0;  // we're eliminating a wrap
+      num--;
+      for (int i=0; i<num; i++) {
+       epoch[i] = epoch[i+1];
+       offset[i] = offset[i+1];
+      }
+    }
+    void push(epoch_t e, off_t o) {
+      assert(num < 3);
+      epoch[num] = e;
+      offset[num] = o;
+      num++;
+    }
   } header;
 
+  struct entry_header_t {
+    uint64_t epoch;
+    uint64_t len;
+    uint64_t magic1;
+    uint64_t magic2;
+    
+    void make_magic(off_t pos, uint64_t fsid) {
+      magic1 = pos;
+      magic2 = fsid ^ epoch ^ len;
+    }
+    bool check_magic(off_t pos, uint64_t fsid) {
+      return
+       magic1 == (uint64_t)pos &&
+       magic2 == (fsid ^ epoch ^ len);
+    }
+  };
+
 private:
   string fn;
 
-  off_t max_size;
-  off_t top;            // byte of first entry chronologically
-  off_t bottom;         // byte where next entry goes
-  off_t committing_to;  // offset of epoch boundary, if we are committing
+  bool full;
+  off_t write_pos;      // byte where next entry written goes
+  off_t queue_pos;      // byte where next entry queued for write goes
+
+  off_t read_pos;       // 
 
   int fd;
 
@@ -47,39 +99,44 @@ private:
   Cond write_cond;
   bool write_stop;
 
+  void print_header();
+  void read_header();
   void write_header();
   void start_writer();
   void stop_writer();
   void write_thread_entry();
 
+  void make_writeable();
+
   class Writer : public Thread {
     FileJournal *journal;
   public:
     Writer(FileJournal *fj) : journal(fj) {}
     void *entry() {
-      journal->write_thread();
+      journal->write_thread_entry();
       return 0;
     }
   } write_thread;
 
  public:
-  FileJournal(Ebofs *e, char *f, off_t sz) : 
-    Journal(e),
-    fn(f), max_size(sz),
-    top(0), bottom(0), committing_to(0),
+  FileJournal(Ebofs *e, char *f) : 
+    Journal(e), fn(f),
+    full(false),
+    write_pos(0), queue_pos(0), read_pos(0),
     fd(0),
-    write_stop(false), write_thread(this)
-  { }
+    write_stop(false), write_thread(this) { }
   ~FileJournal() {}
 
-  void create();
-  void open();
+  int create();
+  int open();
   void close();
 
   // writes
-  void submit_entry(bufferlist& e, Context *oncommit);  // submit an item
-  void commit_epoch_start();  // mark epoch boundary
-  void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+  bool submit_entry(bufferlist& e, Context *oncommit);  // submit an item
+  void commit_epoch_start();   // mark epoch boundary
+  void commit_epoch_finish();  // mark prior epoch as committed (we can expire)
+
+  bool read_entry(bufferlist& bl, epoch_t& e);
 
   // reads
 };
index c05bce5955c5fdb8ddaac9ef19ede19099e0a23f..fb1983c22eafc1672954fd1a745cdb498e4bd10f 100644 (file)
 #ifndef __EBOFS_JOURNAL_H
 #define __EBOFS_JOURNAL_H
 
+class Ebofs;
+
+#include "include/buffer.h"
+#include "include/Context.h"
 
 class Journal {
+protected:
   Ebofs *ebofs;
 
- public:
+public:
   Journal(Ebofs *e) : ebofs(e) { }
   virtual ~Journal() { }
 
-  virtual void create() = 0;
-  virtual void open() = 0;
+  virtual int create() = 0;
+  virtual int open() = 0;
   virtual void close() = 0;
 
   // writes
-  virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+  virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
   virtual void commit_epoch_start() = 0;  // mark epoch boundary
-  virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+  virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
+  virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
 
   // reads/recovery
   
index 704ec1658182421fcf9497b6ca52fdce50704732..345f49b7a68ca35f94423b4d18964f08addf9723 100644 (file)
@@ -145,6 +145,7 @@ int main(int argc, char **argv)
   char *filename = args[0];
   int seconds = atoi(args[1]);
   int threads = atoi(args[2]);
+  if (!threads) threads = 1;
 
   cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
 
@@ -153,7 +154,7 @@ int main(int argc, char **argv)
 
 
   // explicit tests
-  if (1) {
+  if (0) {
     // verify that clone() plays nice with partial writes
     object_t oid(1,1);
     bufferptr bp(10000);
index b03bb8a40d9c9a123cded4558aa4c2b778d3a60f..1fa209a3deeb9c67832e0ac82cc6b18ade92979a 100644 (file)
@@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2;
 
 
 struct ebofs_super {
-  unsigned s_magic;
-  
-  unsigned epoch;             // version of this superblock.
+  uint64_t s_magic;
+  uint64_t fsid;
+
+  epoch_t epoch;             // version of this superblock.
 
-  unsigned num_blocks;        /* # blocks in filesystem */
+  uint64_t num_blocks;        /* # blocks in filesystem */
 
   // some basic stats, for kicks
-  unsigned free_blocks;       /* unused blocks */
-  unsigned limbo_blocks;      /* limbo blocks */
+  uint64_t free_blocks;       /* unused blocks */
+  uint64_t limbo_blocks;      /* limbo blocks */
   //unsigned num_objects;
   //unsigned num_fragmented;
   
index 1f401513f688c90a2bf726477083827ac8e1cca9..1f61d3b892ba5d025047785fd8076f69dbfa5d7a 100644 (file)
@@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off)
   off += len+1;
 }
 
+// const char* (encode only, string compatible)
+inline void _encode(const char *s, bufferlist& bl) 
+{
+  uint32_t len = strlen(s);
+  _encoderaw(len, bl);
+  bl.append(s, len+1);
+}
+
 // bufferptr (encapsulated)
 inline void _encode(bufferptr& bp, bufferlist& bl) 
 {
index 1d6c7a3193c7fd48cb084ae98afd7a9c87746f51..a857a068d76a352bdb2cb686a9c0a86fbfdd906d 100644 (file)
@@ -3401,7 +3401,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
   switch (op->get_op()) {
   case OSD_OP_WRLOCK:
     { // lock object
-      //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
       t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
     }
     break;  
index 3a3cec32515ffbd246d43c3db94106615c203598..74818e0470526b99d8491e5c0b9e20655220e6aa 100644 (file)
@@ -102,14 +102,29 @@ public:
     list<off_t>    offsets;
     list<size_t>   lengths;
     list<const char*> attrnames;
+    list<string> attrnames2;
     //list< pair<const void*,int> > attrvals;
     list<bufferlist>  attrbls;
 
+    // for reads only (not encoded)
     list<bufferlist*> pbls;
     list<struct stat*> psts;
     list< pair<void*,int*> > pattrvals;
     list< map<string,bufferptr>* > pattrsets;
 
+    const char *get_attrname() {
+      if (attrnames.empty()) 
+       return attrnames2.front().c_str();
+      else
+       return attrnames.front();
+    }
+    void pop_attrname() {
+      if (attrnames.empty()) 
+       attrnames2.pop_front();
+      else
+       attrnames.pop_front();
+    }
+
     void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
       int op = OP_READ;
       ops.push_back(op);
@@ -232,6 +247,27 @@ public:
     }
 
     // etc.
+
+    void _encode(bufferlist& bl) {
+      ::_encode(ops, bl);
+      ::_encode(bls, bl);
+      ::_encode(oids, bl);
+      ::_encode(cids, bl);
+      ::_encode(offsets, bl);
+      ::_encode(lengths, bl);
+      ::_encode(attrnames, bl);
+      ::_encode(attrbls, bl);
+    }
+    void _decode(bufferlist& bl, int& off) {
+      ::_decode(ops, bl, off);
+      ::_decode(bls, bl, off);
+      ::_decode(oids, bl, off);
+      ::_decode(cids, bl, off);
+      ::_decode(offsets, bl, off);
+      ::_decode(lengths, bl, off);
+      ::_decode(attrnames2, bl, off);
+      ::_decode(attrbls, bl, off);      
+    }
   };
 
 
@@ -264,7 +300,7 @@ public:
       case Transaction::OP_GETATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
           *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
         }
@@ -314,7 +350,7 @@ public:
       case Transaction::OP_SETATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
           bufferlist bl;
           bl.claim( t.attrbls.front() );
@@ -333,7 +369,7 @@ public:
       case Transaction::OP_RMATTR:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           rmattr(oid, attrname, 0);
         }
         break;
@@ -379,7 +415,7 @@ public:
       case Transaction::OP_COLL_SETATTR:
         {
           coll_t cid = t.cids.front(); t.cids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           //pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
           bufferlist bl;
           bl.claim( t.attrbls.front() );
@@ -391,7 +427,7 @@ public:
       case Transaction::OP_COLL_RMATTR:
         {
           coll_t cid = t.cids.front(); t.cids.pop_front();
-          const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+          const char *attrname = t.get_attrname(); t.pop_attrname();
           collection_rmattr(cid, attrname, 0);
         }
         break;