]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: serialize journal entries
authorSage Weil <sage@newdream.net>
Fri, 12 Sep 2008 16:36:45 +0000 (09:36 -0700)
committerSage Weil <sage@newdream.net>
Fri, 12 Sep 2008 18:56:49 +0000 (11:56 -0700)
src/os/FileStore.cc
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index 3840062ad0296b70c4ca22f8ae98edf1135580be..b35b34898a460a0ad36df901c664328af5a80889 100644 (file)
@@ -401,9 +401,9 @@ int FileStore::mount()
   // get epoch
   sprintf(fn, "%s/commit_epoch", basedir.c_str());
   fd = ::open(fn, O_RDONLY);
-  ::read(fd, &super_epoch, sizeof(super_epoch));
+  ::read(fd, &op_seq, sizeof(op_seq));
   ::close(fd);
-  dout(5) << "mount epoch is " << super_epoch << dendl;
+  dout(5) << "mount op_seq is " << op_seq << dendl;
 
   // journal
   sprintf(fn, "%s.journal", basedir.c_str());
@@ -528,8 +528,18 @@ unsigned FileStore::apply_transaction(Transaction &t, Context *onsafe)
 
   // no btrfs transaction support?
   // or, use trans start/end ioctls?
-  if (!btrfs || btrfs_trans_start_end)
-    return ObjectStore::apply_transaction(t, onsafe);
+  if (!btrfs || btrfs_trans_start_end) {
+    bufferlist tbl;
+    t.encode(tbl);  // apply_transaction modifies t; encode first
+    op_start();
+    int r = ObjectStore::apply_transaction(t);
+    if (r >= 0)
+      journal_transaction(tbl, onsafe);
+    else
+      delete onsafe;
+    op_finish();
+    return r;
+  }
 
   // create transaction
   int len = t.get_btrfs_len();
@@ -951,11 +961,13 @@ int FileStore::remove(coll_t cid, pobject_t oid, Context *onsafe)
   dout(20) << "remove " << cid << " " << oid << dendl;
   char fn[200];
   get_coname(cid, oid, fn);
+  op_start();
   int r = ::unlink(fn);
   if (r == 0) 
     journal_remove(cid, oid, onsafe);
   else
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -965,8 +977,10 @@ int FileStore::truncate(coll_t cid, pobject_t oid, __u64 size, Context *onsafe)
 
   char fn[200];
   get_coname(cid, oid, fn);
+  op_start();
   int r = ::truncate(fn, size);
   if (r >= 0) journal_truncate(cid, oid, size, onsafe);
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -983,7 +997,6 @@ int FileStore::read(coll_t cid, pobject_t oid,
     dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror(errno) << dendl;
     return -errno;
   }
-  ::flock(fd, LOCK_EX);    // lock for safety
   
   __u64 actual = lseek(fd, offset, SEEK_SET);
   size_t got = 0;
@@ -1000,7 +1013,6 @@ int FileStore::read(coll_t cid, pobject_t oid,
     bptr.set_length(got);   // properly size the buffer
     if (got > 0) bl.push_back( bptr );   // put it in the target bufferlist
   }
-  ::flock(fd, LOCK_UN);
   ::close(fd);
   return got;
 }
@@ -1022,8 +1034,9 @@ int FileStore::write(coll_t cid, pobject_t oid,
     derr(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror(errno) << dendl;
     return -errno;
   }
-  ::flock(fd, LOCK_EX);    // lock for safety
   
+  op_start();
+
   // seek
   __u64 actual = ::lseek(fd, offset, SEEK_SET);
   int did = 0;
@@ -1045,7 +1058,6 @@ int FileStore::write(coll_t cid, pobject_t oid,
     derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << dendl;
   }
 
-  ::flock(fd, LOCK_UN);
 
   // schedule sync
   if (did >= 0)
@@ -1054,6 +1066,8 @@ int FileStore::write(coll_t cid, pobject_t oid,
     delete onsafe;
 
   ::close(fd);
+
+  op_finish();
   
   return did;
 }
@@ -1072,6 +1086,9 @@ int FileStore::clone(coll_t cid, pobject_t oldoid, pobject_t newoid)
   int n = ::open(nfn, O_CREAT|O_TRUNC|O_WRONLY, 0644);
   if (n < 0)
       return -errno;
+
+  op_start();
+  
   int r = 0;
 #ifndef DARWIN
   if (btrfs)
@@ -1117,6 +1134,9 @@ int FileStore::clone(coll_t cid, pobject_t oldoid, pobject_t newoid)
     }
 #endif
   }
+
+  op_finish();
+
   if (r < 0)
     return -errno;
 
@@ -1136,7 +1156,7 @@ void FileStore::sync_entry()
     sync_cond.WaitInterval(lock, interval);
     lock.Unlock();
 
-    dout(20) << "sync_entry committing " << super_epoch << " " << interval << dendl;
+    dout(20) << "sync_entry committing " << op_seq << " " << interval << dendl;
     commit_start();
 
     // induce an fs sync.
@@ -1144,14 +1164,17 @@ void FileStore::sync_entry()
     char fn[100];
     sprintf(fn, "%s/commit_epoch", basedir.c_str());
     int fd = ::open(fn, O_CREAT|O_WRONLY, 0644);
-    ::write(fd, &super_epoch, sizeof(super_epoch));
+    ::write(fd, &op_seq, sizeof(op_seq));
+
+    commit_started();
+
     ::fsync(fd);  // this should cause the fs's journal to commit.  (on btrfs too.)
     ::close(fd);
 
     commit_finish();
 
     lock.Lock();
-    dout(20) << "sync_entry committed " << super_epoch << dendl;
+    dout(20) << "sync_entry committed to op_seq " << op_seq << dendl;
   }
   lock.Unlock();
 }
@@ -1179,6 +1202,7 @@ int FileStore::setattr(coll_t cid, pobject_t oid, const char *name,
                       Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.setattr(cid, oid, name, value, size, onsafe);
   else {
@@ -1193,12 +1217,14 @@ int FileStore::setattr(coll_t cid, pobject_t oid, const char *name,
     journal_setattr(cid, oid, name, value, size, onsafe);
   else
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
 int FileStore::setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset, Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.setattrs(cid, oid, aset);
   else {
@@ -1221,6 +1247,7 @@ int FileStore::setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset,
     journal_setattrs(cid, oid, aset, onsafe);
   else
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1293,6 +1320,7 @@ int FileStore::getattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset)
 int FileStore::rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.rmattr(cid, oid, name, onsafe);
   else {
@@ -1306,6 +1334,7 @@ int FileStore::rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsa
     journal_rmattr(cid, oid, name, onsafe);
   else
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1317,6 +1346,7 @@ int FileStore::collection_setattr(coll_t c, const char *name,
                                  Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.collection_setattr(c, name, value, size, onsafe);
   else {
@@ -1328,6 +1358,7 @@ int FileStore::collection_setattr(coll_t c, const char *name,
     journal_collection_setattr(c, name, value, size, onsafe);
   else 
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1335,6 +1366,7 @@ int FileStore::collection_rmattr(coll_t c, const char *name,
                                 Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.collection_rmattr(c, name, onsafe);
   else {
@@ -1342,6 +1374,7 @@ int FileStore::collection_rmattr(coll_t c, const char *name,
     get_cdir(c, fn);
     r = do_removexattr(fn, name);
   }
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1379,6 +1412,7 @@ int FileStore::collection_getattr(coll_t c, const char *name, bufferlist& bl)
 int FileStore::collection_setattrs(coll_t cid, map<string,bufferptr>& aset) 
 {
   int r;
+  op_start();
   if (fake_attrs) 
     r = attrs.collection_setattrs(cid, aset);
   else {
@@ -1394,6 +1428,7 @@ int FileStore::collection_setattrs(coll_t cid, map<string,bufferptr>& aset)
   }
   if (r >= 0)
     journal_collection_setattrs(cid, aset, 0);
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1461,6 +1496,8 @@ int FileStore::create_collection(coll_t c,
 {
   if (fake_collections) return collections.create_collection(c, onsafe);
   
+  op_start();
+
   char fn[200];
   get_cdir(c, fn);
 
@@ -1470,6 +1507,7 @@ int FileStore::create_collection(coll_t c,
     journal_create_collection(c, onsafe);
   else 
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1478,6 +1516,8 @@ int FileStore::destroy_collection(coll_t c,
 {
   if (fake_collections) return collections.destroy_collection(c, onsafe);
 
+  op_start();
+
   char fn[200];
   get_cdir(c, fn);
   char cmd[200];
@@ -1489,6 +1529,8 @@ int FileStore::destroy_collection(coll_t c,
     journal_destroy_collection(c, onsafe);
   else 
     delete onsafe;
+
+  op_finish();
   return 0;
 }
 
@@ -1515,6 +1557,7 @@ int FileStore::collection_add(coll_t c, coll_t cid, pobject_t o,
                              Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_collections) 
     r = collections.collection_add(c, o, onsafe);
   else {
@@ -1528,6 +1571,7 @@ int FileStore::collection_add(coll_t c, coll_t cid, pobject_t o,
     journal_collection_add(c, cid, o, onsafe);
   else 
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
@@ -1535,6 +1579,7 @@ int FileStore::collection_remove(coll_t c, pobject_t o,
                                 Context *onsafe) 
 {
   int r;
+  op_start();
   if (fake_collections) 
     r = collections.collection_remove(c, o, onsafe);
   else {
@@ -1546,6 +1591,7 @@ int FileStore::collection_remove(coll_t c, pobject_t o,
     journal_collection_remove(c, o, onsafe);
   else
     delete onsafe;
+  op_finish();
   return r < 0 ? -errno:r;
 }
 
index 5fd1c5a94fd2ac7716fa2756f8035c3fc1c38f3c..ac720121d15213c89284d2e0382cb56743902089 100644 (file)
@@ -11,7 +11,7 @@ int JournalingObjectStore::journal_replay()
   if (!journal)
     return 0;
 
-  int err = journal->open(super_epoch);
+  int err = journal->open(op_seq);
   if (err < 0) {
     dout(3) << "journal_replay open failed with" << err
            << " " << strerror(err) << dendl;
@@ -23,26 +23,23 @@ int JournalingObjectStore::journal_replay()
   int count = 0;
   while (1) {
     bufferlist bl;
-    __u64 e;
-    if (!journal->read_entry(bl, e)) {
+    __u64 seq;
+    if (!journal->read_entry(bl, seq)) {
       dout(3) << "journal_replay: end of journal, done." << dendl;
       break;
     }
-    
-    if (e < super_epoch) {
-      dout(3) << "journal_replay: skipping old entry in epoch " << e << " < " << super_epoch << dendl;
+
+    if (seq <= op_seq) {
+      dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl;
       continue;
     }
-    if (e == super_epoch+1) {
-      super_epoch++;
-      dout(3) << "journal_replay: jumped to next epoch " << super_epoch << dendl;
-    }
-    assert(e == super_epoch);
+    assert(op_seq == seq-1);
     
-    dout(3) << "journal_replay: applying transaction in epoch " << e << dendl;
+    dout(3) << "journal_replay: applying op seq " << seq << dendl;
     Transaction t(bl);
     apply_transaction(t);
-    count++;
+
+    assert(op_seq == seq);
   }
 
   // done reading, make writeable.
index de43b37d1cd5bb1ee6f24933559eeb3bcf677c9b..020f49516e2ade628473180a292799264fb2360d 100644 (file)
@@ -21,7 +21,8 @@
 
 class JournalingObjectStore : public ObjectStore {
 protected:
-  epoch_t super_epoch;
+  __u64 op_seq;
+  __u64 committing_op_seq;
   Journal *journal;
   Finisher finisher;
   map<version_t, vector<Context*> > commit_waiters;
@@ -48,197 +49,225 @@ protected:
   }
 
   void commit_start() {
+    // suspend new ops...
     op_lock.get_write();
-    super_epoch++;
+  }
+  void commit_started() {
+    // allow new ops
+    // (underlying fs should now be committing all prior ops)
+    committing_op_seq = op_seq;
     op_lock.put_write();
   }
   void commit_finish() {
     if (journal)
-      journal->committed_thru(super_epoch-1);
-    finisher.queue(commit_waiters[super_epoch-1]);
+      journal->committed_thru(committing_op_seq);
+    finisher.queue(commit_waiters[committing_op_seq]);
   }
 
   void queue_commit_waiter(Context *oncommit) {
     if (oncommit) 
-      commit_waiters[super_epoch].push_back(oncommit);
+      commit_waiters[op_seq].push_back(oncommit);
   }
 
   void journal_transaction(Transaction &t, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       bufferlist tbl;
       t.encode(tbl);
-      journal->submit_entry(super_epoch, tbl, onsafe);
+      journal->submit_entry(op_seq, tbl, onsafe);
+    } else
+      queue_commit_waiter(onsafe);
+  }
+  void journal_transaction(bufferlist& tbl, Context *onsafe) {
+    ++op_seq;
+    if (journal && journal->is_writeable()) {
+      journal->submit_entry(op_seq, tbl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_write(coll_t cid, pobject_t oid, loff_t off, size_t len, const bufferlist& bl, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.write(cid, oid, off, len, bl);
       bufferlist tbl;
       t.encode(tbl);
-      journal->submit_entry(super_epoch, tbl, onsafe);
+      journal->submit_entry(op_seq, tbl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
   
   void journal_zero(coll_t cid, pobject_t oid, loff_t off, size_t len, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.zero(cid, oid, off, len);
       bufferlist tbl;
       t.encode(tbl);
-      journal->submit_entry(super_epoch, tbl, onsafe);
+      journal->submit_entry(op_seq, tbl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
   
   void journal_remove(coll_t cid, pobject_t oid, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.remove(cid, oid);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_truncate(coll_t cid, pobject_t oid, loff_t size, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.truncate(cid, oid, size);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_clone(coll_t cid, pobject_t from, pobject_t to, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.clone(cid, from, to);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_setattr(coll_t cid, pobject_t oid, const char *name, const void *value, size_t size, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.setattr(cid, oid, name, value, size);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& attrset, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.setattrs(cid, oid, attrset);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.rmattr(cid, oid, name);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_create_collection(coll_t cid, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.create_collection(cid);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_destroy_collection(coll_t cid, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.remove_collection(cid);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
   
   void journal_collection_add(coll_t cid, coll_t ocid, pobject_t oid, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.collection_add(cid, ocid, oid);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_collection_remove(coll_t cid, pobject_t oid, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.collection_remove(cid, oid);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
   void journal_collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.collection_setattr(cid, name, value, size);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
   
   void journal_collection_setattrs(coll_t cid, map<string,bufferptr>& aset, Context *onsafe) {
+    ++op_seq;
     if (journal && journal->is_writeable()) {
       Transaction t;
       t.collection_setattrs(cid, aset);
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
   
   void journal_sync(Context *onsafe) {
+    ++op_seq;
     if (journal) {  
       // journal empty transaction
       Transaction t;
       bufferlist bl;
       t.encode(bl);
-      journal->submit_entry(super_epoch, bl, onsafe);
+      journal->submit_entry(op_seq, bl, onsafe);
     } else
       queue_commit_waiter(onsafe);
   }
 
 public:
-  JournalingObjectStore() : super_epoch(0), journal(0) { }
+  JournalingObjectStore() : op_seq(0), committing_op_seq(0), journal(0) { }
   
 };