From 20f81f7644aa6242e2944d76bdc44c0b23868672 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 12 Sep 2008 09:36:45 -0700 Subject: [PATCH] filestore: serialize journal entries --- src/os/FileStore.cc | 68 +++++++++++++++++++++++++----- src/os/JournalingObjectStore.cc | 23 +++++------ src/os/JournalingObjectStore.h | 73 +++++++++++++++++++++++---------- 3 files changed, 118 insertions(+), 46 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 3840062ad0296..b35b34898a460 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -401,9 +401,9 @@ int FileStore::mount() // get epoch sprintf(fn, "%s/commit_epoch", basedir.c_str()); fd = ::open(fn, O_RDONLY); - ::read(fd, &super_epoch, sizeof(super_epoch)); + ::read(fd, &op_seq, sizeof(op_seq)); ::close(fd); - dout(5) << "mount epoch is " << super_epoch << dendl; + dout(5) << "mount op_seq is " << op_seq << dendl; // journal sprintf(fn, "%s.journal", basedir.c_str()); @@ -528,8 +528,18 @@ unsigned FileStore::apply_transaction(Transaction &t, Context *onsafe) // no btrfs transaction support? // or, use trans start/end ioctls? - if (!btrfs || btrfs_trans_start_end) - return ObjectStore::apply_transaction(t, onsafe); + if (!btrfs || btrfs_trans_start_end) { + bufferlist tbl; + t.encode(tbl); // apply_transaction modifies t; encode first + op_start(); + int r = ObjectStore::apply_transaction(t); + if (r >= 0) + journal_transaction(tbl, onsafe); + else + delete onsafe; + op_finish(); + return r; + } // create transaction int len = t.get_btrfs_len(); @@ -951,11 +961,13 @@ int FileStore::remove(coll_t cid, pobject_t oid, Context *onsafe) dout(20) << "remove " << cid << " " << oid << dendl; char fn[200]; get_coname(cid, oid, fn); + op_start(); int r = ::unlink(fn); if (r == 0) journal_remove(cid, oid, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -965,8 +977,10 @@ int FileStore::truncate(coll_t cid, pobject_t oid, __u64 size, Context *onsafe) char fn[200]; get_coname(cid, oid, fn); + op_start(); int r = ::truncate(fn, size); if (r >= 0) journal_truncate(cid, oid, size, onsafe); + op_finish(); return r < 0 ? -errno:r; } @@ -983,7 +997,6 @@ int FileStore::read(coll_t cid, pobject_t oid, dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror(errno) << dendl; return -errno; } - ::flock(fd, LOCK_EX); // lock for safety __u64 actual = lseek(fd, offset, SEEK_SET); size_t got = 0; @@ -1000,7 +1013,6 @@ int FileStore::read(coll_t cid, pobject_t oid, bptr.set_length(got); // properly size the buffer if (got > 0) bl.push_back( bptr ); // put it in the target bufferlist } - ::flock(fd, LOCK_UN); ::close(fd); return got; } @@ -1022,8 +1034,9 @@ int FileStore::write(coll_t cid, pobject_t oid, derr(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror(errno) << dendl; return -errno; } - ::flock(fd, LOCK_EX); // lock for safety + op_start(); + // seek __u64 actual = ::lseek(fd, offset, SEEK_SET); int did = 0; @@ -1045,7 +1058,6 @@ int FileStore::write(coll_t cid, pobject_t oid, derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << dendl; } - ::flock(fd, LOCK_UN); // schedule sync if (did >= 0) @@ -1054,6 +1066,8 @@ int FileStore::write(coll_t cid, pobject_t oid, delete onsafe; ::close(fd); + + op_finish(); return did; } @@ -1072,6 +1086,9 @@ int FileStore::clone(coll_t cid, pobject_t oldoid, pobject_t newoid) int n = ::open(nfn, O_CREAT|O_TRUNC|O_WRONLY, 0644); if (n < 0) return -errno; + + op_start(); + int r = 0; #ifndef DARWIN if (btrfs) @@ -1117,6 +1134,9 @@ int FileStore::clone(coll_t cid, pobject_t oldoid, pobject_t newoid) } #endif } + + op_finish(); + if (r < 0) return -errno; @@ -1136,7 +1156,7 @@ void FileStore::sync_entry() sync_cond.WaitInterval(lock, interval); lock.Unlock(); - dout(20) << "sync_entry committing " << super_epoch << " " << interval << dendl; + dout(20) << "sync_entry committing " << op_seq << " " << interval << dendl; commit_start(); // induce an fs sync. @@ -1144,14 +1164,17 @@ void FileStore::sync_entry() char fn[100]; sprintf(fn, "%s/commit_epoch", basedir.c_str()); int fd = ::open(fn, O_CREAT|O_WRONLY, 0644); - ::write(fd, &super_epoch, sizeof(super_epoch)); + ::write(fd, &op_seq, sizeof(op_seq)); + + commit_started(); + ::fsync(fd); // this should cause the fs's journal to commit. (on btrfs too.) ::close(fd); commit_finish(); lock.Lock(); - dout(20) << "sync_entry committed " << super_epoch << dendl; + dout(20) << "sync_entry committed to op_seq " << op_seq << dendl; } lock.Unlock(); } @@ -1179,6 +1202,7 @@ int FileStore::setattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) { int r; + op_start(); if (fake_attrs) r = attrs.setattr(cid, oid, name, value, size, onsafe); else { @@ -1193,12 +1217,14 @@ int FileStore::setattr(coll_t cid, pobject_t oid, const char *name, journal_setattr(cid, oid, name, value, size, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } int FileStore::setattrs(coll_t cid, pobject_t oid, map& aset, Context *onsafe) { int r; + op_start(); if (fake_attrs) r = attrs.setattrs(cid, oid, aset); else { @@ -1221,6 +1247,7 @@ int FileStore::setattrs(coll_t cid, pobject_t oid, map& aset, journal_setattrs(cid, oid, aset, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -1293,6 +1320,7 @@ int FileStore::getattrs(coll_t cid, pobject_t oid, map& aset) int FileStore::rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) { int r; + op_start(); if (fake_attrs) r = attrs.rmattr(cid, oid, name, onsafe); else { @@ -1306,6 +1334,7 @@ int FileStore::rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsa journal_rmattr(cid, oid, name, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -1317,6 +1346,7 @@ int FileStore::collection_setattr(coll_t c, const char *name, Context *onsafe) { int r; + op_start(); if (fake_attrs) r = attrs.collection_setattr(c, name, value, size, onsafe); else { @@ -1328,6 +1358,7 @@ int FileStore::collection_setattr(coll_t c, const char *name, journal_collection_setattr(c, name, value, size, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -1335,6 +1366,7 @@ int FileStore::collection_rmattr(coll_t c, const char *name, Context *onsafe) { int r; + op_start(); if (fake_attrs) r = attrs.collection_rmattr(c, name, onsafe); else { @@ -1342,6 +1374,7 @@ int FileStore::collection_rmattr(coll_t c, const char *name, get_cdir(c, fn); r = do_removexattr(fn, name); } + op_finish(); return r < 0 ? -errno:r; } @@ -1379,6 +1412,7 @@ int FileStore::collection_getattr(coll_t c, const char *name, bufferlist& bl) int FileStore::collection_setattrs(coll_t cid, map& aset) { int r; + op_start(); if (fake_attrs) r = attrs.collection_setattrs(cid, aset); else { @@ -1394,6 +1428,7 @@ int FileStore::collection_setattrs(coll_t cid, map& aset) } if (r >= 0) journal_collection_setattrs(cid, aset, 0); + op_finish(); return r < 0 ? -errno:r; } @@ -1461,6 +1496,8 @@ int FileStore::create_collection(coll_t c, { if (fake_collections) return collections.create_collection(c, onsafe); + op_start(); + char fn[200]; get_cdir(c, fn); @@ -1470,6 +1507,7 @@ int FileStore::create_collection(coll_t c, journal_create_collection(c, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -1478,6 +1516,8 @@ int FileStore::destroy_collection(coll_t c, { if (fake_collections) return collections.destroy_collection(c, onsafe); + op_start(); + char fn[200]; get_cdir(c, fn); char cmd[200]; @@ -1489,6 +1529,8 @@ int FileStore::destroy_collection(coll_t c, journal_destroy_collection(c, onsafe); else delete onsafe; + + op_finish(); return 0; } @@ -1515,6 +1557,7 @@ int FileStore::collection_add(coll_t c, coll_t cid, pobject_t o, Context *onsafe) { int r; + op_start(); if (fake_collections) r = collections.collection_add(c, o, onsafe); else { @@ -1528,6 +1571,7 @@ int FileStore::collection_add(coll_t c, coll_t cid, pobject_t o, journal_collection_add(c, cid, o, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } @@ -1535,6 +1579,7 @@ int FileStore::collection_remove(coll_t c, pobject_t o, Context *onsafe) { int r; + op_start(); if (fake_collections) r = collections.collection_remove(c, o, onsafe); else { @@ -1546,6 +1591,7 @@ int FileStore::collection_remove(coll_t c, pobject_t o, journal_collection_remove(c, o, onsafe); else delete onsafe; + op_finish(); return r < 0 ? -errno:r; } diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 5fd1c5a94fd2a..ac720121d1521 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -11,7 +11,7 @@ int JournalingObjectStore::journal_replay() if (!journal) return 0; - int err = journal->open(super_epoch); + int err = journal->open(op_seq); if (err < 0) { dout(3) << "journal_replay open failed with" << err << " " << strerror(err) << dendl; @@ -23,26 +23,23 @@ int JournalingObjectStore::journal_replay() int count = 0; while (1) { bufferlist bl; - __u64 e; - if (!journal->read_entry(bl, e)) { + __u64 seq; + if (!journal->read_entry(bl, seq)) { dout(3) << "journal_replay: end of journal, done." << dendl; break; } - - if (e < super_epoch) { - dout(3) << "journal_replay: skipping old entry in epoch " << e << " < " << super_epoch << dendl; + + if (seq <= op_seq) { + dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl; continue; } - if (e == super_epoch+1) { - super_epoch++; - dout(3) << "journal_replay: jumped to next epoch " << super_epoch << dendl; - } - assert(e == super_epoch); + assert(op_seq == seq-1); - dout(3) << "journal_replay: applying transaction in epoch " << e << dendl; + dout(3) << "journal_replay: applying op seq " << seq << dendl; Transaction t(bl); apply_transaction(t); - count++; + + assert(op_seq == seq); } // done reading, make writeable. diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index de43b37d1cd5b..020f49516e2ad 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -21,7 +21,8 @@ class JournalingObjectStore : public ObjectStore { protected: - epoch_t super_epoch; + __u64 op_seq; + __u64 committing_op_seq; Journal *journal; Finisher finisher; map > commit_waiters; @@ -48,197 +49,225 @@ protected: } void commit_start() { + // suspend new ops... op_lock.get_write(); - super_epoch++; + } + void commit_started() { + // allow new ops + // (underlying fs should now be committing all prior ops) + committing_op_seq = op_seq; op_lock.put_write(); } void commit_finish() { if (journal) - journal->committed_thru(super_epoch-1); - finisher.queue(commit_waiters[super_epoch-1]); + journal->committed_thru(committing_op_seq); + finisher.queue(commit_waiters[committing_op_seq]); } void queue_commit_waiter(Context *oncommit) { if (oncommit) - commit_waiters[super_epoch].push_back(oncommit); + commit_waiters[op_seq].push_back(oncommit); } void journal_transaction(Transaction &t, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { bufferlist tbl; t.encode(tbl); - journal->submit_entry(super_epoch, tbl, onsafe); + journal->submit_entry(op_seq, tbl, onsafe); + } else + queue_commit_waiter(onsafe); + } + void journal_transaction(bufferlist& tbl, Context *onsafe) { + ++op_seq; + if (journal && journal->is_writeable()) { + journal->submit_entry(op_seq, tbl, onsafe); } else queue_commit_waiter(onsafe); } void journal_write(coll_t cid, pobject_t oid, loff_t off, size_t len, const bufferlist& bl, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.write(cid, oid, off, len, bl); bufferlist tbl; t.encode(tbl); - journal->submit_entry(super_epoch, tbl, onsafe); + journal->submit_entry(op_seq, tbl, onsafe); } else queue_commit_waiter(onsafe); } void journal_zero(coll_t cid, pobject_t oid, loff_t off, size_t len, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.zero(cid, oid, off, len); bufferlist tbl; t.encode(tbl); - journal->submit_entry(super_epoch, tbl, onsafe); + journal->submit_entry(op_seq, tbl, onsafe); } else queue_commit_waiter(onsafe); } void journal_remove(coll_t cid, pobject_t oid, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.remove(cid, oid); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_truncate(coll_t cid, pobject_t oid, loff_t size, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.truncate(cid, oid, size); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_clone(coll_t cid, pobject_t from, pobject_t to, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.clone(cid, from, to); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_setattr(coll_t cid, pobject_t oid, const char *name, const void *value, size_t size, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.setattr(cid, oid, name, value, size); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_setattrs(coll_t cid, pobject_t oid, map& attrset, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.setattrs(cid, oid, attrset); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.rmattr(cid, oid, name); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_create_collection(coll_t cid, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.create_collection(cid); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_destroy_collection(coll_t cid, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.remove_collection(cid); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_collection_add(coll_t cid, coll_t ocid, pobject_t oid, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.collection_add(cid, ocid, oid); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_collection_remove(coll_t cid, pobject_t oid, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.collection_remove(cid, oid); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.collection_setattr(cid, name, value, size); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_collection_setattrs(coll_t cid, map& aset, Context *onsafe) { + ++op_seq; if (journal && journal->is_writeable()) { Transaction t; t.collection_setattrs(cid, aset); bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } void journal_sync(Context *onsafe) { + ++op_seq; if (journal) { // journal empty transaction Transaction t; bufferlist bl; t.encode(bl); - journal->submit_entry(super_epoch, bl, onsafe); + journal->submit_entry(op_seq, bl, onsafe); } else queue_commit_waiter(onsafe); } public: - JournalingObjectStore() : super_epoch(0), journal(0) { } + JournalingObjectStore() : op_seq(0), committing_op_seq(0), journal(0) { } }; -- 2.39.5