From 12ed9df2ca3dc83983081714968ab25057fef99d Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 1 Jun 2007 21:17:34 +0000 Subject: [PATCH] * some edits git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1388 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/ebofs/FileJournal.cc | 171 ++++++++++++++++++-------------- trunk/ceph/ebofs/FileJournal.h | 41 +++++--- 2 files changed, 124 insertions(+), 88 deletions(-) diff --git a/trunk/ceph/ebofs/FileJournal.cc b/trunk/ceph/ebofs/FileJournal.cc index 5f38c7725ea8c..87ea20199cd24 100644 --- a/trunk/ceph/ebofs/FileJournal.cc +++ b/trunk/ceph/ebofs/FileJournal.cc @@ -57,119 +57,144 @@ void FileJournal::close() void FileJournal::start_writer() { - writer_stop = false; + write_stop = false; write_thread.create(); } void FileJournal::stop_writer() { - writer_lock.Lock(); + write_lock.Lock(); { - writer_stop = true; - writer_cond.Signal(); + write_stop = true; + write_cond.Signal(); } - writer_lock.Unlock(); + write_lock.Unlock(); write_thread.join(); } + +void FileJournal::write_header() +{ + dout(10) << "write_header" << endl; + + ::lseek(fd, 0, SEEK_SET); + ::write(fd, &header, sizeof(header)); +} + + void FileJournal::write_thread_entry() { dout(10) << "write_thread_entry start" << endl; - writer_lock.Lock(); + write_lock.Lock(); while (!write_stop) { - if (writeq.empty()) { - // sleep - dout(20) << "write_thread_entry going to sleep" << endl; - write_cond.Wait(writer_lock); - dout(20) << "write_thread_entry woke up" << endl; - continue; - } - - // do queued writes - while (!writeq.empty()) { - // grab next item - epoch_t e = writeq.front().first; - bufferlist bl; - bl.claim(writeq.front().second); - writeq.pop_front(); - Context *oncommit = commitq.front(); - commitq.pop_front(); - - dout(15) << "write_thread_entry writing " << bottom << " : " - << bl.length() - << " epoch " << e - << endl; - - // write epoch, len, data. - ::fseek(fd, bottom, SEEK_SET); - ::write(fd, &e, sizeof(e)); - - uint32_t len = bl.length(); - ::write(fd, &len, sizeof(len)); - - for (list::const_iterator it = bl.buffers().begin(); - it != bl.buffers().end(); - it++) { - if ((*it).length() == 0) continue; // blank buffer. - ::write(fd, (char*)(*it).c_str(), (*it).length() ); - } - - // move position pointer - bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length(); - - // do commit callback - if (oncommit) { - oncommit->finish(0); - delete oncommit; - } - } + if (writeq.empty()) { + // sleep + dout(20) << "write_thread_entry going to sleep" << endl; + write_cond.Wait(write_lock); + dout(20) << "write_thread_entry woke up" << endl; + continue; + } + + // do queued writes + while (!writeq.empty()) { + // grab next item + epoch_t e = writeq.front().first; + bufferlist bl; + bl.claim(writeq.front().second); + writeq.pop_front(); + Context *oncommit = commitq.front(); + commitq.pop_front(); + + dout(15) << "write_thread_entry writing " << bottom << " : " + << bl.length() + << " epoch " << e + << endl; + + // write epoch, len, data. + ::fseek(fd, bottom, SEEK_SET); + ::write(fd, &e, sizeof(e)); + + uint32_t len = bl.length(); + ::write(fd, &len, sizeof(len)); + + for (list::const_iterator it = bl.buffers().begin(); + it != bl.buffers().end(); + it++) { + if ((*it).length() == 0) continue; // blank buffer. + ::write(fd, (char*)(*it).c_str(), (*it).length() ); + } + + // move position pointer + bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length(); + + // do commit callback + if (oncommit) { + oncommit->finish(0); + delete oncommit; + } + } } - writer_lock.Unlock(); + write_lock.Unlock(); dout(10) << "write_thread_entry finish" << endl; } void FileJournal::submit_entry(bufferlist& e, Context *oncommit) { dout(10) << "submit_entry " << bottom << " : " << e.length() - << " epoch " << ebofs->super_epoch - << " " << oncommit << endl; - + << " epoch " << ebofs->super_epoch + << " " << oncommit << endl; + // dump on queue writeq.push_back(pair(ebofs->super_epoch, e)); commitq.push_back(oncommit); // kick writer thread - writer_cond.Signal(); + write_cond.Signal(); } void FileJournal::commit_epoch_start() { dout(10) << "commit_epoch_start" << endl; + + write_lock.Lock(); + { + header.epoch2 = ebofs->super_epoch; + header.top2 = bottom; + write_header(); + } + write_lock.Unlock(); } void FileJournal::commit_epoch_finish() { dout(10) << "commit_epoch_finish" << endl; - - // flush any unwritten items in previous epoch - writer_lock.Lock(); + + write_lock.Lock(); { - while (!writeq.empty() && - writeq.front().first < ebofs->super_epoch) { - dout(15) << " dropping uncommitted journal item from prior epoch" << endl; - writeq.pop_front(); - Context *oncommit = commitq.front(); - commitq.pop_front(); - - if (oncommit) { - oncommit->finish(0); - delete oncommit; - } - } + // update header + header.epoch1 = ebofs->super_epoch; + header.top1 = header.top2; + header.epoch2 = 0; + header.top2 = 0; + write_header(); + + // flush any unwritten items in previous epoch + while (!writeq.empty() && + writeq.front().first < ebofs->super_epoch) { + dout(15) << " dropping uncommitted journal item from prior epoch" << endl; + writeq.pop_front(); + Context *oncommit = commitq.front(); + commitq.pop_front(); + + if (oncommit) { + oncommit->finish(0); + delete oncommit; + } + } } - writer_lock.Unlock(); + write_lock.Unlock(); } diff --git a/trunk/ceph/ebofs/FileJournal.h b/trunk/ceph/ebofs/FileJournal.h index 33333f7259fb3..6c34f24f33955 100644 --- a/trunk/ceph/ebofs/FileJournal.h +++ b/trunk/ceph/ebofs/FileJournal.h @@ -21,6 +21,15 @@ class FileJournal : public Journal { +public: + struct header_t { + epoch_t epoch1; + off_t top1; + epoch_t epoch2; + off_t top2; + } header; + +private: string fn; off_t max_size; @@ -31,10 +40,12 @@ class FileJournal : public Journal { int fd; list > writeq; // currently journaling - map commitq; // currently journaling + list commitq; // currently journaling // write thread - bool writer_stop; + Mutex write_lock; + Cond write_cond; + bool write_stop; void write_header(); void start_writer(); @@ -42,23 +53,23 @@ class FileJournal : public Journal { void write_thread_entry(); class Writer : public Thread { - FileJournal *journal; + FileJournal *journal; public: - Writer(FileJournal *fj) : journal(fj) {} - void *entry() { - journal->write_thread(); - return 0; - } - } writer_thread; + Writer(FileJournal *fj) : journal(fj) {} + void *entry() { + journal->write_thread(); + return 0; + } + } write_thread; public: FileJournal(Ebofs *e, char *f, off_t sz) : - Journal(e), - fn(f), max_size(sz), - top(0), bottom(0), committing_to(0), - fd(0), - writer_stop(false), writer_thread(this) - { } + Journal(e), + fn(f), max_size(sz), + top(0), bottom(0), committing_to(0), + fd(0), + write_stop(false), write_thread(this) + { } ~FileJournal() {} void create(); -- 2.39.5