void FileJournal::start_writer()
{
- writer_stop = false;
+ write_stop = false;
write_thread.create();
}
void FileJournal::stop_writer()
{
- writer_lock.Lock();
+ write_lock.Lock();
{
- writer_stop = true;
- writer_cond.Signal();
+ write_stop = true;
+ write_cond.Signal();
}
- writer_lock.Unlock();
+ write_lock.Unlock();
write_thread.join();
}
+
+void FileJournal::write_header()
+{
+ dout(10) << "write_header" << endl;
+
+ ::lseek(fd, 0, SEEK_SET);
+ ::write(fd, &header, sizeof(header));
+}
+
+
void FileJournal::write_thread_entry()
{
dout(10) << "write_thread_entry start" << endl;
- writer_lock.Lock();
+ write_lock.Lock();
while (!write_stop) {
- if (writeq.empty()) {
- // sleep
- dout(20) << "write_thread_entry going to sleep" << endl;
- write_cond.Wait(writer_lock);
- dout(20) << "write_thread_entry woke up" << endl;
- continue;
- }
-
- // do queued writes
- while (!writeq.empty()) {
- // grab next item
- epoch_t e = writeq.front().first;
- bufferlist bl;
- bl.claim(writeq.front().second);
- writeq.pop_front();
- Context *oncommit = commitq.front();
- commitq.pop_front();
-
- dout(15) << "write_thread_entry writing " << bottom << " : "
- << bl.length()
- << " epoch " << e
- << endl;
-
- // write epoch, len, data.
- ::fseek(fd, bottom, SEEK_SET);
- ::write(fd, &e, sizeof(e));
-
- uint32_t len = bl.length();
- ::write(fd, &len, sizeof(len));
-
- for (list<bufferptr>::const_iterator it = bl.buffers().begin();
- it != bl.buffers().end();
- it++) {
- if ((*it).length() == 0) continue; // blank buffer.
- ::write(fd, (char*)(*it).c_str(), (*it).length() );
- }
-
- // move position pointer
- bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
-
- // do commit callback
- if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
- }
- }
+ if (writeq.empty()) {
+ // sleep
+ dout(20) << "write_thread_entry going to sleep" << endl;
+ write_cond.Wait(write_lock);
+ dout(20) << "write_thread_entry woke up" << endl;
+ continue;
+ }
+
+ // do queued writes
+ while (!writeq.empty()) {
+ // grab next item
+ epoch_t e = writeq.front().first;
+ bufferlist bl;
+ bl.claim(writeq.front().second);
+ writeq.pop_front();
+ Context *oncommit = commitq.front();
+ commitq.pop_front();
+
+ dout(15) << "write_thread_entry writing " << bottom << " : "
+ << bl.length()
+ << " epoch " << e
+ << endl;
+
+ // write epoch, len, data.
+ ::fseek(fd, bottom, SEEK_SET);
+ ::write(fd, &e, sizeof(e));
+
+ uint32_t len = bl.length();
+ ::write(fd, &len, sizeof(len));
+
+ for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ if ((*it).length() == 0) continue; // blank buffer.
+ ::write(fd, (char*)(*it).c_str(), (*it).length() );
+ }
+
+ // move position pointer
+ bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+
+ // do commit callback
+ if (oncommit) {
+ oncommit->finish(0);
+ delete oncommit;
+ }
+ }
}
- writer_lock.Unlock();
+ write_lock.Unlock();
dout(10) << "write_thread_entry finish" << endl;
}
void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
{
dout(10) << "submit_entry " << bottom << " : " << e.length()
- << " epoch " << ebofs->super_epoch
- << " " << oncommit << endl;
-
+ << " epoch " << ebofs->super_epoch
+ << " " << oncommit << endl;
+
// dump on queue
writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
commitq.push_back(oncommit);
// kick writer thread
- writer_cond.Signal();
+ write_cond.Signal();
}
void FileJournal::commit_epoch_start()
{
dout(10) << "commit_epoch_start" << endl;
+
+ write_lock.Lock();
+ {
+ header.epoch2 = ebofs->super_epoch;
+ header.top2 = bottom;
+ write_header();
+ }
+ write_lock.Unlock();
}
void FileJournal::commit_epoch_finish()
{
dout(10) << "commit_epoch_finish" << endl;
-
- // flush any unwritten items in previous epoch
- writer_lock.Lock();
+
+ write_lock.Lock();
{
- while (!writeq.empty() &&
- writeq.front().first < ebofs->super_epoch) {
- dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
- writeq.pop_front();
- Context *oncommit = commitq.front();
- commitq.pop_front();
-
- if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
- }
- }
+ // update header
+ header.epoch1 = ebofs->super_epoch;
+ header.top1 = header.top2;
+ header.epoch2 = 0;
+ header.top2 = 0;
+ write_header();
+
+ // flush any unwritten items in previous epoch
+ while (!writeq.empty() &&
+ writeq.front().first < ebofs->super_epoch) {
+ dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
+ writeq.pop_front();
+ Context *oncommit = commitq.front();
+ commitq.pop_front();
+
+ if (oncommit) {
+ oncommit->finish(0);
+ delete oncommit;
+ }
+ }
}
- writer_lock.Unlock();
+ write_lock.Unlock();
}
class FileJournal : public Journal {
+public:
+ struct header_t {
+ epoch_t epoch1;
+ off_t top1;
+ epoch_t epoch2;
+ off_t top2;
+ } header;
+
+private:
string fn;
off_t max_size;
int fd;
list<pair<epoch_t,bufferlist> > writeq; // currently journaling
- map<off_t,Context*> commitq; // currently journaling
+ list<Context*> commitq; // currently journaling
// write thread
- bool writer_stop;
+ Mutex write_lock;
+ Cond write_cond;
+ bool write_stop;
void write_header();
void start_writer();
void write_thread_entry();
class Writer : public Thread {
- FileJournal *journal;
+ FileJournal *journal;
public:
- Writer(FileJournal *fj) : journal(fj) {}
- void *entry() {
- journal->write_thread();
- return 0;
- }
- } writer_thread;
+ Writer(FileJournal *fj) : journal(fj) {}
+ void *entry() {
+ journal->write_thread();
+ return 0;
+ }
+ } write_thread;
public:
FileJournal(Ebofs *e, char *f, off_t sz) :
- Journal(e),
- fn(f), max_size(sz),
- top(0), bottom(0), committing_to(0),
- fd(0),
- writer_stop(false), writer_thread(this)
- { }
+ Journal(e),
+ fn(f), max_size(sz),
+ top(0), bottom(0), committing_to(0),
+ fd(0),
+ write_stop(false), write_thread(this)
+ { }
~FileJournal() {}
void create();