From 5fe44f34e779ec31d90924c0b4bbfd85e6f579ea Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 24 Jan 2008 21:59:38 -0800 Subject: [PATCH] journal replay, makefile cleanup --- src/Makefile | 2 ++ src/ebofs/FileJournal.cc | 47 ++++++++++++++++++++++---------- src/ebofs/FileJournal.h | 4 +-- src/ebofs/mkfs.ebofs.cc | 6 ++-- src/ebofs/streamtest.cc | 59 ++++++++++++++++++++++++++++++---------- src/ebofs/test.ebofs.cc | 8 +++--- 6 files changed, 87 insertions(+), 39 deletions(-) diff --git a/src/Makefile b/src/Makefile index 6f383bcb49868..ab1ee7d61df51 100644 --- a/src/Makefile +++ b/src/Makefile @@ -226,6 +226,8 @@ streamtest.ebofs: ebofs/streamtest.cc config.cc common/Clock.o ebofs.o dupstore: dupstore.cc config.cc ebofs.o common/Clock.o common/Timer.o osd/FakeStore.o ${CXX} ${CFLAGS} ${LIBS} $^ -o $@ +allebofs: mkfs.ebofs test.ebofs streamtest.ebofs dupstore + # hadoop libhadoopcephfs.so: client/hadoop/CephFSInterface.cc client.o osdc.o msg/SimpleMessenger.o common.o diff --git a/src/ebofs/FileJournal.cc b/src/ebofs/FileJournal.cc index af2111a0ef175..73a1009bdbdb2 100644 --- a/src/ebofs/FileJournal.cc +++ b/src/ebofs/FileJournal.cc @@ -27,17 +27,24 @@ #define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " -int FileJournal::_open() +int FileJournal::_open(bool forwrite) { - int flags = O_RDWR; - if (directio) flags |= O_DIRECT; + int flags; + + if (forwrite) { + flags = O_RDONLY; + } else { + flags = O_RDWR; + if (directio) flags |= O_DIRECT; + } + if (fd >= 0) + ::close(fd); fd = ::open(fn.c_str(), flags); if (fd < 0) { dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl; return -errno; } - assert(fd > 0); // get size struct stat st; @@ -53,7 +60,7 @@ int FileJournal::create() { dout(2) << "create " << fn << dendl; - int err = _open(); + int err = _open(true); if (err < 0) return err; // write empty header @@ -69,6 +76,7 @@ int FileJournal::create() write_pos = get_top(); ::close(fd); + fd = -1; dout(2) << "create done" << dendl; return 0; } @@ -77,7 +85,7 @@ int FileJournal::open() { dout(2) << "open " << fn << dendl; - int err = _open(); + int err = _open(false); if (err < 0) return err; // assume writeable, unless... @@ -121,8 +129,6 @@ int FileJournal::open() } } } - write_header(); - start_writer(); return 0; } @@ -139,7 +145,7 @@ void FileJournal::close() assert(commitq.empty()); assert(fd > 0); ::close(fd); - fd = 0; + fd = -1; } void FileJournal::start_writer() @@ -311,7 +317,6 @@ void FileJournal::prepare_multi_write(bufferlist& bl) } } - bool FileJournal::prepare_single_dio_write(bufferlist& bl) { // grab next item @@ -359,6 +364,8 @@ void FileJournal::do_write(bufferlist& bl) writing = true; + header_t old_header = header; + write_lock.Unlock(); dout(15) << "do_write writing " << write_pos << "~" << bl.length() @@ -378,12 +385,18 @@ void FileJournal::do_write(bufferlist& bl) ::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos); pos += (*it).length(); } + ::fdatasync(fd); write_lock.Lock(); writing = false; - write_pos += bl.length(); - ebofs->queue_finishers(writingq); + if (memcmp(&old_header, &header, sizeof(header)) == 0) { + write_pos += bl.length(); + ebofs->queue_finishers(writingq); + } else { + derr(0) << "do_write finished write but header changed? not moving write_pos." << dendl; + assert(writingq.empty()); + } } @@ -451,7 +464,7 @@ void FileJournal::commit_epoch_start() dout(1) << " journal FULL, ignoring this epoch" << dendl; return; } - + dout(1) << " clearing FULL flag, journal now usable" << dendl; full = false; } @@ -506,11 +519,16 @@ void FileJournal::commit_epoch_finish(epoch_t new_epoch) void FileJournal::make_writeable() { + _open(true); + if (read_pos) write_pos = read_pos; else write_pos = get_top(); read_pos = 0; + + must_write_header = true; + start_writer(); } @@ -536,8 +554,7 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch) // header entry_header_t h; - ::lseek(fd, read_pos, SEEK_SET); - ::read(fd, &h, sizeof(h)); + ::pread(fd, &h, sizeof(h), read_pos); if (!h.check_magic(read_pos, header.fsid)) { dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl; return false; diff --git a/src/ebofs/FileJournal.h b/src/ebofs/FileJournal.h index 9365645ffdf76..a0a5cc40ad33a 100644 --- a/src/ebofs/FileJournal.h +++ b/src/ebofs/FileJournal.h @@ -116,7 +116,7 @@ private: Cond write_cond; bool write_stop; - int _open(); + int _open(bool wr); void print_header(); void read_header(); bufferptr prepare_header(); @@ -154,7 +154,7 @@ private: directio(dio), full(false), writing(false), must_write_header(false), write_pos(0), read_pos(0), - fd(0), + fd(-1), write_stop(false), write_thread(this) { } ~FileJournal() {} diff --git a/src/ebofs/mkfs.ebofs.cc b/src/ebofs/mkfs.ebofs.cc index d1d5975e7fd65..2f29ab93cd4ff 100644 --- a/src/ebofs/mkfs.ebofs.cc +++ b/src/ebofs/mkfs.ebofs.cc @@ -18,10 +18,10 @@ #include "ebofs/Ebofs.h" -int main(int argc, char **argv) +int main(int argc, const char **argv) { // args - vector args; + vector args; argv_to_vec(argc, argv, args); parse_config_options(args); @@ -29,7 +29,7 @@ int main(int argc, char **argv) cerr << "usage: mkfs.ebofs [options] " << std::endl; return -1; } - char *filename = args[0]; + const char *filename = args[0]; // mkfs Ebofs mfs(filename); diff --git a/src/ebofs/streamtest.cc b/src/ebofs/streamtest.cc index 41470591e0ded..144d52b1d9f0a 100644 --- a/src/ebofs/streamtest.cc +++ b/src/ebofs/streamtest.cc @@ -17,20 +17,55 @@ #include #include "ebofs/Ebofs.h" -map > writes; +struct io { + utime_t start, ack, commit; + bool done() { + return ack.sec() && commit.sec(); + } +}; +map writes; Mutex lock; + +void pr(off_t off) +{ + io &i = writes[off]; + dout(0) << off << "\t" + << (i.ack - i.start) << "\t" + << (i.commit - i.start) << dendl; + writes.erase(off); +} + +void set_start(off_t off, utime_t t) +{ + Mutex::Locker l(lock); + writes[off].start = t; +} + +void set_ack(off_t off, utime_t t) +{ + Mutex::Locker l(lock); + writes[off].ack = t; + if (writes[off].done()) + pr(off); +} + +void set_commit(off_t off, utime_t t) +{ + Mutex::Locker l(lock); + writes[off].commit = t; + if (writes[off].done()) + pr(off); +} + + struct C_Commit : public Context { off_t off; C_Commit(off_t o) : off(o) {} void finish(int r) { Mutex::Locker l(lock); - utime_t now = g_clock.now(); - dout(0) << off << "\t" - << (writes[off].second-writes[off].first) << "\t" - << (now - writes[off].first) << dendl; - writes.erase(off); + set_commit(off, g_clock.now()); } }; @@ -78,17 +113,11 @@ int main(int argc, const char **argv) cout << "# offset\tack\tcommit" << std::endl; while (now < end) { object_t oid(1,1); - utime_t start; - { - Mutex::Locker l(lock); - start = writes[pos].first = now; - } + utime_t start = now; + set_start(pos, now); fs.write(oid, pos, bytes, bl, new C_Commit(pos)); now = g_clock.now(); - { - Mutex::Locker l(lock); - writes[pos].second = now; - } + set_ack(pos, now); pos += bytes; // wait? diff --git a/src/ebofs/test.ebofs.cc b/src/ebofs/test.ebofs.cc index dd78b8ff9a11e..8e4bfcdbd4597 100644 --- a/src/ebofs/test.ebofs.cc +++ b/src/ebofs/test.ebofs.cc @@ -44,7 +44,7 @@ public: coll_t cid = rand() % 50; off_t off = rand() % 10000;//0;//rand() % 1000000; off_t len = 1+rand() % 100000; - char *a = "one"; + const char *a = "one"; if (rand() % 2) a = "two"; int l = 3;//rand() % 10; @@ -150,15 +150,15 @@ public: } }; -int main(int argc, char **argv) +int main(int argc, const char **argv) { - vector args; + vector args; argv_to_vec(argc, argv, args); parse_config_options(args); // args if (args.size() != 3) return -1; - char *filename = args[0]; + const char *filename = args[0]; int seconds = atoi(args[1]); int threads = atoi(args[2]); if (!threads) threads = 1; -- 2.39.5