From 41e1212bda54aafca964e06abbc04c1dc30835ad Mon Sep 17 00:00:00 2001 From: sageweil Date: Sun, 1 Jul 2007 14:35:29 +0000 Subject: [PATCH] merged trunk changes r1424:1461 into branches/sage/mon2 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1462 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mon2/Makefile | 3 +- branches/sage/mon2/ebofs/Ebofs.cc | 336 ++++++++++++++++----- branches/sage/mon2/ebofs/Ebofs.h | 40 ++- branches/sage/mon2/ebofs/FileJournal.cc | 373 ++++++++++++++++++++---- branches/sage/mon2/ebofs/FileJournal.h | 99 +++++-- branches/sage/mon2/ebofs/Journal.h | 16 +- branches/sage/mon2/ebofs/test.ebofs.cc | 3 +- branches/sage/mon2/ebofs/types.h | 13 +- branches/sage/mon2/include/buffer.h | 8 + branches/sage/mon2/osd/OSD.cc | 1 - branches/sage/mon2/osd/ObjectStore.h | 46 ++- 11 files changed, 761 insertions(+), 177 deletions(-) diff --git a/branches/sage/mon2/Makefile b/branches/sage/mon2/Makefile index 96c7a57d55227..5a9a77437fb8e 100644 --- a/branches/sage/mon2/Makefile +++ b/branches/sage/mon2/Makefile @@ -38,7 +38,8 @@ EBOFS_OBJS= \ ebofs/BlockDevice.o\ ebofs/BufferCache.o\ ebofs/Ebofs.o\ - ebofs/Allocator.o + ebofs/Allocator.o\ + ebofs/FileJournal.o MDS_OBJS= \ mds/MDS.o\ diff --git a/branches/sage/mon2/ebofs/Ebofs.cc b/branches/sage/mon2/ebofs/Ebofs.cc index a7c192b390100..f315d0385016f 100644 --- a/branches/sage/mon2/ebofs/Ebofs.cc +++ b/branches/sage/mon2/ebofs/Ebofs.cc @@ -16,6 +16,8 @@ #include "Ebofs.h" +#include "FileJournal.h" + #include #ifndef DARWIN @@ -50,6 +52,7 @@ int Ebofs::mount() ebofs_lock.Lock(); assert(!mounted); + // open dev int r = dev.open(&idle_kicker); if (r < 0) { ebofs_lock.Unlock(); @@ -79,6 +82,8 @@ int Ebofs::mount() dout(3) << "mount epoch " << super_epoch << endl; assert(super_epoch == sb->epoch); + super_fsid = sb->fsid; + free_blocks = sb->free_blocks; limbo_blocks = sb->limbo_blocks; @@ -101,6 +106,43 @@ int Ebofs::mount() allocator.release_limbo(); + + // open journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->open() < 0) { + dout(-3) << "mount journal " << journalfn << " open failed" << endl; + delete journal; + journal = 0; + } else { + dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl; + + while (1) { + bufferlist bl; + epoch_t e; + if (!journal->read_entry(bl, e)) { + dout(-3) << "mount replay: end of journal, done." << endl; + break; + } + + if (e < super_epoch) { + dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl; + } + if (e == super_epoch+1) { + super_epoch++; + dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl; + } + assert(e == super_epoch); + + dout(-3) << "mount replay: applying transaction in epoch " << e << endl; + Transaction t; + int off = 0; + t._decode(bl, off); + _apply_transaction(t); + } + } + } + dout(3) << "mount starting commit+finisher threads" << endl; commit_thread.create(); finisher_thread.create(); @@ -108,6 +150,7 @@ int Ebofs::mount() dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; mounted = true; + ebofs_lock.Unlock(); return 0; } @@ -126,6 +169,10 @@ int Ebofs::mkfs() block_t num_blocks = dev.get_num_blocks(); + // make a super-random fsid + srand(time(0) ^ getpid()); + super_fsid = (lrand48() << 32) ^ mrand48(); + free_blocks = 0; limbo_blocks = 0; @@ -197,6 +244,18 @@ int Ebofs::mkfs() dev.close(); + + // create journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->create() < 0) { + dout(3) << "mount journal " << journalfn << " created failed" << endl; + delete journal; + } else { + dout(3) << "mount journal " << journalfn << " created" << endl; + } + } + dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; ebofs_lock.Unlock(); return 0; @@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp) // fill in super memset(&sb, 0, sizeof(sb)); sb.s_magic = EBOFS_MAGIC; + sb.fsid = super_fsid; sb.epoch = epoch; sb.num_blocks = dev.get_num_blocks(); @@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry() << ", max dirty " << g_conf.ebofs_bc_max_dirty << endl; + if (journal) journal->commit_epoch_start(); // (async) write onodes+condes (do this first; it currently involves inode reallocation) commit_inodes_start(); @@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry() alloc_more_node_space(); } + // signal journal + if (journal) journal->commit_epoch_finish(); + // kick waiters dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl; - finisher_lock.Lock(); - finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]); + queue_finishers(commit_waiters[super_epoch-1]); commit_waiters.erase(super_epoch-1); - finisher_cond.Signal(); - finisher_lock.Unlock(); sync_cond.Signal(); @@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe) ebofs_lock.Lock(); if (onsafe) { dirty = true; - commit_waiters[super_epoch].push_back(onsafe); + + while (1) { + if (journal) { + // journal empty transaction + Transaction t; + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + commit_waiters[super_epoch].push_back(onsafe); + break; + } } ebofs_lock.Unlock(); } @@ -1994,6 +2066,29 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) ebofs_lock.Lock(); dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl; + unsigned r = _apply_transaction(t); + + // journal, wait for commit + if (r != 0 && onsafe) { + delete onsafe; // kill callback, but still journal below (in case transaction had side effects) + onsafe = 0; + } + while (1) { + if (journal) { + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } + + ebofs_lock.Unlock(); + return r; +} + +unsigned Ebofs::_apply_transaction(Transaction& t) +{ // do ops unsigned r = 0; // bit fields indicate which ops failed. int bit = 1; @@ -2028,7 +2123,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) { dout(7) << "apply_transaction fail on _getattr" << endl; @@ -2095,7 +2190,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2121,7 +2216,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_rmattr(oid, attrname) < 0) { dout(7) << "apply_transaction fail on _rmattr" << endl; r &= bit; @@ -2185,7 +2280,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2201,7 +2296,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_collection_rmattr(cid, attrname) < 0) { dout(7) << "apply_transaction fail on _collection_rmattr" << endl; r &= bit; @@ -2217,16 +2312,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) bit = bit << 1; } - dout(7) << "apply_transaction finish (r = " << r << ")" << endl; - - // set up commit waiter - //if (r == 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - //} else { - //if (onsafe) delete onsafe; - //} - - ebofs_lock.Unlock(); + dout(7) << "_apply_transaction finish (r = " << r << ")" << endl; return r; } @@ -2295,36 +2381,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl) } -/*int Ebofs::write(object_t oid, - off_t off, size_t len, - bufferlist& bl, bool fsync) -{ - // wait? - if (fsync) { - // wait for flush. - Cond cond; - bool done; - int flush = 1; // write never returns positive - Context *c = new C_Cond(&cond, &done, &flush); - int r = write(oid, off, len, bl, c); - if (r < 0) return r; - - ebofs_lock.Lock(); - { - while (!done) - cond.Wait(ebofs_lock); - assert(flush <= 0); - } - ebofs_lock.Unlock(); - if (flush < 0) return flush; - return r; - } else { - // don't wait for flush. - return write(oid, off, len, bl, (Context*)0); - } -} -*/ - int Ebofs::write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe) @@ -2338,7 +2394,17 @@ int Ebofs::write(object_t oid, // commit waiter if (r > 0) { assert((size_t)r == len); - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.write(oid, off, len, bl); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2370,9 +2436,19 @@ int Ebofs::remove(object_t oid, Context *onsafe) // do it int r = _remove(oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove(oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2445,9 +2521,19 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe) int r = _truncate(oid, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.truncate(oid, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2464,9 +2550,19 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe) int r = _clone(from, to); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.clone(from, to); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2640,9 +2736,19 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz ebofs_lock.Lock(); int r = _setattr(oid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattr(oid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2673,9 +2779,19 @@ int Ebofs::setattrs(object_t oid, map& attrset, Context *onsaf ebofs_lock.Lock(); int r = _setattrs(oid, attrset); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattrs(oid, attrset); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2758,9 +2874,19 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) int r = _rmattr(oid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.rmattr(oid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2835,9 +2961,19 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe) int r = _create_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.create_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2882,9 +3018,19 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe) int r = _destroy_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2936,9 +3082,19 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe) int r = _collection_add(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_add(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2977,9 +3133,19 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe) int r = _collection_remove(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_remove(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3040,9 +3206,19 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s int r = _collection_setattr(cid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_setattr(cid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3098,9 +3274,19 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) int r = _collection_rmattr(cid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_rmattr(cid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } diff --git a/branches/sage/mon2/ebofs/Ebofs.h b/branches/sage/mon2/ebofs/Ebofs.h index 91c5eb51b3cda..eb20cf8920531 100644 --- a/branches/sage/mon2/ebofs/Ebofs.h +++ b/branches/sage/mon2/ebofs/Ebofs.h @@ -29,6 +29,7 @@ using namespace __gnu_cxx; #include "nodes.h" #include "Allocator.h" #include "Table.h" +#include "Journal.h" #include "common/Mutex.h" #include "common/Cond.h" @@ -40,20 +41,23 @@ typedef pair coll_object_t; class Ebofs : public ObjectStore { - protected: +protected: Mutex ebofs_lock; // a beautiful global lock // ** debuggy ** bool fake_writes; // ** super ** +public: BlockDevice dev; +protected: bool mounted, unmounting, dirty; bool readonly; version_t super_epoch; bool commit_thread_started, mid_commit; Cond commit_cond; // to wake up the commit thread Cond sync_cond; + uint64_t super_fsid; map > commit_waiters; @@ -71,9 +75,16 @@ class Ebofs : public ObjectStore { } } commit_thread; - +public: + uint64_t get_fsid() { return super_fsid; } + epoch_t get_super_epoch() { return super_epoch; } +protected: + // ** journal ** + char *journalfn; + Journal *journal; + // ** allocator ** block_t free_blocks, limbo_blocks; Allocator allocator; @@ -188,6 +199,21 @@ class Ebofs : public ObjectStore { bool finisher_stop; list finisher_queue; +public: + void queue_finisher(Context *c) { + finisher_lock.Lock(); + finisher_queue.push_back(c); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } + void queue_finishers(list& ls) { + finisher_lock.Lock(); + finisher_queue.splice(finisher_queue.end(), ls); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } +protected: + void *finisher_thread_entry(); class FinisherThread : public Thread { Ebofs *ebofs; @@ -204,12 +230,13 @@ class Ebofs : public ObjectStore { public: - Ebofs(char *devfn) : + Ebofs(char *devfn, char *jfn=0) : fake_writes(false), dev(devfn), mounted(false), unmounting(false), dirty(false), readonly(false), super_epoch(0), commit_thread_started(false), mid_commit(false), commit_thread(this), + journalfn(jfn), journal(0), free_blocks(0), limbo_blocks(0), allocator(this), nodepool(ebofs_lock), @@ -222,6 +249,11 @@ class Ebofs : public ObjectStore { finisher_stop(false), finisher_thread(this) { for (int i=0; i + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #include "FileJournal.h" #include "Ebofs.h" -#include "config.h" -#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal " -#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal " +#include +#include +#include +#include + +#include "config.h" +#undef dout +#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal " +#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal " -void FileJournal::create() +int FileJournal::create() { dout(1) << "create " << fn << endl; // open/create - fd = ::open(fn.c_str(), O_CREAT|O_WRONLY); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "create failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - ::ftruncate(fd); - ::fchmod(fd, 0644); + //::ftruncate(fd, 0); + //::fchmod(fd, 0644); + + // get size + struct stat st; + ::fstat(fd, &st); + dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl; + + // write empty header + header.clear(); + header.fsid = ebofs->get_fsid(); + header.max_size = st.st_size; + write_header(); + + // writeable. + read_pos = 0; + write_pos = queue_pos = sizeof(header); ::close(fd); -} + return 0; +} -void FileJournal::open() +int FileJournal::open() { - dout(1) << "open " << fn << endl; + //dout(1) << "open " << fn << endl; // open and file assert(fd == 0); - fd = ::open(fn.c_str(), O_RDWR); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "open failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - // read header? - // *** + // assume writeable, unless... + read_pos = 0; + write_pos = queue_pos = sizeof(header); + // read header? + read_header(); + if (header.num > 0 && header.fsid == ebofs->get_fsid()) { + // valid header, pick an offset + for (int i=0; iget_super_epoch()) { + dout(2) << "using read_pos header pointer " + << header.epoch[i] << " at " << header.offset[i] + << endl; + read_pos = header.offset[i]; + write_pos = queue_pos = 0; + break; + } + else if (header.epoch[i] < ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", skipping old " << header.epoch[i] << " at " << header.offset[i] + << endl; + } + else if (header.epoch[i] > ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] + << endl; + break; + } + } + } start_writer(); + + return 0; } void FileJournal::close() @@ -49,7 +119,8 @@ void FileJournal::close() stop_writer(); // close - assert(q.empty()); + assert(writeq.empty()); + assert(commitq.empty()); assert(fd > 0); ::close(fd); fd = 0; @@ -73,12 +144,36 @@ void FileJournal::stop_writer() } +void FileJournal::print_header() +{ + for (int i=0; i::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); @@ -124,14 +231,21 @@ void FileJournal::write_thread_entry() if ((*it).length() == 0) continue; // blank buffer. ::write(fd, (char*)(*it).c_str(), (*it).length() ); } + + ::write(fd, &h, sizeof(h)); // move position pointer - bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length(); + write_pos += 2*sizeof(entry_header_t) + bl.length(); - // do commit callback if (oncommit) { - oncommit->finish(0); - delete oncommit; + if (1) { + // queue callback + ebofs->queue_finisher(oncommit); + } else { + // callback now + oncommit->finish(0); + delete oncommit; + } } } } @@ -140,61 +254,202 @@ void FileJournal::write_thread_entry() dout(10) << "write_thread_entry finish" << endl; } -void FileJournal::submit_entry(bufferlist& e, Context *oncommit) +bool FileJournal::submit_entry(bufferlist& e, Context *oncommit) { - dout(10) << "submit_entry " << bottom << " : " << e.length() - << " epoch " << ebofs->super_epoch + assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. + + // ** lock ** + Mutex::Locker locker(write_lock); + + // wrap? full? + off_t size = 2*sizeof(entry_header_t) + e.length(); + + if (full) return false; // already marked full. + + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (queue_pos + size >= header.offset[0]) { + dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size + << " >= " << header.offset[0] + << endl; + full = true; + print_header(); + return false; + } + } else { + // we haven't wrapped. + if (queue_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl; + header.wrap = queue_pos; + queue_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), queue_pos); + } else { + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size + << " >= " << header.max_size + << endl; + full = true; + return false; + } + } + } + + dout(10) << "submit_entry " << queue_pos << " : " << e.length() + << " epoch " << ebofs->get_super_epoch() << " " << oncommit << endl; // dump on queue - writeq.push_back(pair(ebofs->super_epoch, e)); + writeq.push_back(pair(ebofs->get_super_epoch(), e)); commitq.push_back(oncommit); - + + queue_pos += size; + // kick writer thread write_cond.Signal(); + + return true; } void FileJournal::commit_epoch_start() { - dout(10) << "commit_epoch_start" << endl; + dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 + << " -- new epoch " << ebofs->get_super_epoch() + << endl; - write_lock.Lock(); - { - header.epoch2 = ebofs->super_epoch; - header.top2 = bottom; - write_header(); - } - write_lock.Unlock(); + Mutex::Locker locker(write_lock); + + // was full -> empty -> now usable? + if (full) { + if (header.num != 0) { + dout(1) << " journal FULL, ignoring this epoch" << endl; + return; + } + + dout(1) << " clearing FULL flag, journal now usable" << endl; + full = false; + } + + // note epoch boundary + header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. + //write_header(); // no need to write it now, though... } void FileJournal::commit_epoch_finish() { - dout(10) << "commit_epoch_finish" << endl; + dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl; write_lock.Lock(); { - // update header - header.epoch1 = ebofs->super_epoch; - header.top1 = header.top2; - header.epoch2 = 0; - header.top2 = 0; + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl; + header.clear(); + write_pos = queue_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } write_header(); - // flush any unwritten items in previous epoch - while (!writeq.empty() && - writeq.front().first < ebofs->super_epoch) { - dout(15) << " dropping uncommitted journal item from prior epoch" << endl; - writeq.pop_front(); + // discard any unwritten items in previous epoch, and do callbacks + epoch_t epoch = ebofs->get_super_epoch(); + list callbacks; + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << endl; + // finisher? Context *oncommit = commitq.front(); + if (oncommit) callbacks.push_back(oncommit); + + write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); + + // discard. + writeq.pop_front(); commitq.pop_front(); - - if (oncommit) { - oncommit->finish(0); - delete oncommit; - } } + + // queue the finishers + ebofs->queue_finishers(callbacks); } write_lock.Unlock(); } + + +void FileJournal::make_writeable() +{ + if (read_pos) + write_pos = queue_pos = read_pos; + else + write_pos = queue_pos = sizeof(header_t); + read_pos = 0; +} + + +bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch) +{ + if (!read_pos) { + dout(1) << "read_entry -- not readable" << endl; + make_writeable(); + return false; + } + + if (read_pos == header.wrap) { + // find wrap point + for (int i=1; i= 2 && offset[0] > offset[1]) + wrap = 0; // we're eliminating a wrap + num--; + for (int i=0; iwrite_thread(); + journal->write_thread_entry(); return 0; } } write_thread; public: - FileJournal(Ebofs *e, char *f, off_t sz) : - Journal(e), - fn(f), max_size(sz), - top(0), bottom(0), committing_to(0), + FileJournal(Ebofs *e, char *f) : + Journal(e), fn(f), + full(false), + write_pos(0), queue_pos(0), read_pos(0), fd(0), - write_stop(false), write_thread(this) - { } + write_stop(false), write_thread(this) { } ~FileJournal() {} - void create(); - void open(); + int create(); + int open(); void close(); // writes - void submit_entry(bufferlist& e, Context *oncommit); // submit an item - void commit_epoch_start(); // mark epoch boundary - void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void commit_epoch_start(); // mark epoch boundary + void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + + bool read_entry(bufferlist& bl, epoch_t& e); // reads }; diff --git a/branches/sage/mon2/ebofs/Journal.h b/branches/sage/mon2/ebofs/Journal.h index c05bce5955c5f..fb1983c22eafc 100644 --- a/branches/sage/mon2/ebofs/Journal.h +++ b/branches/sage/mon2/ebofs/Journal.h @@ -16,22 +16,28 @@ #ifndef __EBOFS_JOURNAL_H #define __EBOFS_JOURNAL_H +class Ebofs; + +#include "include/buffer.h" +#include "include/Context.h" class Journal { +protected: Ebofs *ebofs; - public: +public: Journal(Ebofs *e) : ebofs(e) { } virtual ~Journal() { } - virtual void create() = 0; - virtual void open() = 0; + virtual int create() = 0; + virtual int open() = 0; virtual void close() = 0; // writes - virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item + virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item virtual void commit_epoch_start() = 0; // mark epoch boundary - virtual void commit_epoch_finish(list& ls) = 0; // mark prior epoch as committed (we can expire) + virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire) + virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0; // reads/recovery diff --git a/branches/sage/mon2/ebofs/test.ebofs.cc b/branches/sage/mon2/ebofs/test.ebofs.cc index 704ec16581824..345f49b7a68ca 100644 --- a/branches/sage/mon2/ebofs/test.ebofs.cc +++ b/branches/sage/mon2/ebofs/test.ebofs.cc @@ -145,6 +145,7 @@ int main(int argc, char **argv) char *filename = args[0]; int seconds = atoi(args[1]); int threads = atoi(args[2]); + if (!threads) threads = 1; cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl; @@ -153,7 +154,7 @@ int main(int argc, char **argv) // explicit tests - if (1) { + if (0) { // verify that clone() plays nice with partial writes object_t oid(1,1); bufferptr bp(10000); diff --git a/branches/sage/mon2/ebofs/types.h b/branches/sage/mon2/ebofs/types.h index b03bb8a40d9c9..1fa209a3deeb9 100644 --- a/branches/sage/mon2/ebofs/types.h +++ b/branches/sage/mon2/ebofs/types.h @@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2; struct ebofs_super { - unsigned s_magic; - - unsigned epoch; // version of this superblock. + uint64_t s_magic; + uint64_t fsid; + + epoch_t epoch; // version of this superblock. - unsigned num_blocks; /* # blocks in filesystem */ + uint64_t num_blocks; /* # blocks in filesystem */ // some basic stats, for kicks - unsigned free_blocks; /* unused blocks */ - unsigned limbo_blocks; /* limbo blocks */ + uint64_t free_blocks; /* unused blocks */ + uint64_t limbo_blocks; /* limbo blocks */ //unsigned num_objects; //unsigned num_fragmented; diff --git a/branches/sage/mon2/include/buffer.h b/branches/sage/mon2/include/buffer.h index 1f401513f688c..1f61d3b892ba5 100644 --- a/branches/sage/mon2/include/buffer.h +++ b/branches/sage/mon2/include/buffer.h @@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off) off += len+1; } +// const char* (encode only, string compatible) +inline void _encode(const char *s, bufferlist& bl) +{ + uint32_t len = strlen(s); + _encoderaw(len, bl); + bl.append(s, len+1); +} + // bufferptr (encapsulated) inline void _encode(bufferptr& bp, bufferlist& bl) { diff --git a/branches/sage/mon2/osd/OSD.cc b/branches/sage/mon2/osd/OSD.cc index 1d6c7a3193c7f..a857a068d76a3 100644 --- a/branches/sage/mon2/osd/OSD.cc +++ b/branches/sage/mon2/osd/OSD.cc @@ -3401,7 +3401,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, switch (op->get_op()) { case OSD_OP_WRLOCK: { // lock object - //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t)); } break; diff --git a/branches/sage/mon2/osd/ObjectStore.h b/branches/sage/mon2/osd/ObjectStore.h index 3a3cec32515ff..74818e0470526 100644 --- a/branches/sage/mon2/osd/ObjectStore.h +++ b/branches/sage/mon2/osd/ObjectStore.h @@ -102,14 +102,29 @@ public: list offsets; list lengths; list attrnames; + list attrnames2; //list< pair > attrvals; list attrbls; + // for reads only (not encoded) list pbls; list psts; list< pair > pattrvals; list< map* > pattrsets; + const char *get_attrname() { + if (attrnames.empty()) + return attrnames2.front().c_str(); + else + return attrnames.front(); + } + void pop_attrname() { + if (attrnames.empty()) + attrnames2.pop_front(); + else + attrnames.pop_front(); + } + void read(object_t oid, off_t off, size_t len, bufferlist *pbl) { int op = OP_READ; ops.push_back(op); @@ -232,6 +247,27 @@ public: } // etc. + + void _encode(bufferlist& bl) { + ::_encode(ops, bl); + ::_encode(bls, bl); + ::_encode(oids, bl); + ::_encode(cids, bl); + ::_encode(offsets, bl); + ::_encode(lengths, bl); + ::_encode(attrnames, bl); + ::_encode(attrbls, bl); + } + void _decode(bufferlist& bl, int& off) { + ::_decode(ops, bl, off); + ::_decode(bls, bl, off); + ::_decode(oids, bl, off); + ::_decode(cids, bl, off); + ::_decode(offsets, bl, off); + ::_decode(lengths, bl, off); + ::_decode(attrnames2, bl, off); + ::_decode(attrbls, bl, off); + } }; @@ -264,7 +300,7 @@ public: case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second); } @@ -314,7 +350,7 @@ public: case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -333,7 +369,7 @@ public: case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); rmattr(oid, attrname, 0); } break; @@ -379,7 +415,7 @@ public: case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -391,7 +427,7 @@ public: case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); collection_rmattr(cid, attrname, 0); } break; -- 2.39.5