From 78d363f1d78ebd0c5652653ef767ba954f67277b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 24 Jan 2008 02:12:48 -0800 Subject: [PATCH] hacked dio journal --- src/Makefile | 3 +- src/ebofs/DIOJournal.cc | 319 ++++++++++++++++++--------------------- src/ebofs/DIOJournal.h | 39 +++-- src/ebofs/Ebofs.cc | 2 + src/ebofs/FileJournal.cc | 4 +- 5 files changed, 173 insertions(+), 194 deletions(-) diff --git a/src/Makefile b/src/Makefile index 6f383bcb49868..023f5e144f0fd 100644 --- a/src/Makefile +++ b/src/Makefile @@ -53,7 +53,8 @@ EBOFS_OBJS= \ ebofs/BufferCache.o\ ebofs/Ebofs.o\ ebofs/Allocator.o\ - ebofs/FileJournal.o + ebofs/FileJournal.o\ + ebofs/DIOJournal.o MDS_OBJS= \ mds/MDS.o\ diff --git a/src/ebofs/DIOJournal.cc b/src/ebofs/DIOJournal.cc index 75eb408215454..9146abcb8fa79 100644 --- a/src/ebofs/DIOJournal.cc +++ b/src/ebofs/DIOJournal.cc @@ -27,7 +27,7 @@ #define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " -int DIOJournal::create() +int DioJournal::create() { dout(2) << "create " << fn << dendl; @@ -56,14 +56,14 @@ int DIOJournal::create() // writeable. read_pos = 0; - write_pos = queue_pos = sizeof(header); + write_pos = sizeof(header); ::close(fd); return 0; } -int DIOJournal::open() +int DioJournal::open() { //dout(1) << "open " << fn << dendl; @@ -78,7 +78,7 @@ int DIOJournal::open() // assume writeable, unless... read_pos = 0; - write_pos = queue_pos = sizeof(header); + write_pos = sizeof(header); // read header? read_header(); @@ -93,7 +93,7 @@ int DIOJournal::open() << header.epoch[i] << " at " << header.offset[i] << dendl; read_pos = header.offset[i]; - write_pos = queue_pos = 0; + write_pos = 0; break; } else if (header.epoch[i] < ebofs->get_super_epoch()) { @@ -115,7 +115,7 @@ int DIOJournal::open() return 0; } -void DIOJournal::close() +void DioJournal::close() { dout(1) << "close " << fn << dendl; @@ -130,13 +130,13 @@ void DIOJournal::close() fd = 0; } -void DIOJournal::start_writer() +void DioJournal::start_writer() { write_stop = false; write_thread.create(); } -void DIOJournal::stop_writer() +void DioJournal::stop_writer() { write_lock.Lock(); { @@ -148,7 +148,7 @@ void DIOJournal::stop_writer() } -void DIOJournal::print_header() +void DioJournal::print_header() { for (int i=0; i header.last_epoch()) { + dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl; + header.push(epoch, write_pos); + } - // write! - dout(-15) << "write_thread_entry writing " << write_pos << " : " - << len << " (" << dolen << ")" - << " epoch " << epoch - << dendl; - - // write entry header - //entry_header_t h; - h->epoch = epoch; - h->len = len;//bl.length(); - h->make_magic(write_pos, header.fsid); - - ::lseek(fd, write_pos, SEEK_SET); - /* - ::write(fd, &h, sizeof(h)); - for (list::const_iterator it = bl.buffers().begin(); - it != bl.buffers().end(); - it++) { - if ((*it).length() == 0) continue; // blank buffer. - ::write(fd, (char*)(*it).c_str(), (*it).length() ); + off_t size = 2*sizeof(entry_header_t) + ebl.length(); + size = DIV_ROUND_UP(size, 4096) * 4096; + + // does it fit? + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (write_pos + size >= header.offset[0]) { + dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size + << " >= " << header.offset[0] + << dendl; + full = true; + writeq.clear(); + print_header(); + continue; } - ::write(fd, &h, sizeof(h)); - ::fsync(fd); - */ - int r = ::write(fd, bp.c_str(), dolen); - if (r != dolen) derr(0) << "write got " << r << " not " << dolen << dendl; - - // move position pointer - write_pos += dolen; - - if (oncommit) { - if (1) { - // queue callback - ebofs->queue_finisher(oncommit); + } else { + // we haven't wrapped. + if (write_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl; + header.wrap = write_pos; + write_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), write_pos); + write_header(); } else { - // callback now - oncommit->finish(0); - delete oncommit; + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size + << " >= " << header.max_size + << dendl; + full = true; + writeq.clear(); + continue; } } } + + // build it + bufferptr bp = buffer::create_page_aligned(size); + entry_header_t *h = (entry_header_t*)bp.c_str(); + ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t)); + + // add to write buffer + dout(15) << "write_thread_entry will write " << write_pos << " : " + << bp.length() + << " epoch " << epoch + << dendl; + + // add it this entry + h->epoch = epoch; + h->len = ebl.length(); + h->make_magic(write_pos, header.fsid); + memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h)); + + Context *oncommit = commitq.front(); + if (oncommit) + writingq.push_back(oncommit); + + // pop from writeq + writeq.pop_front(); + commitq.pop_front(); + + writing = true; + write_lock.Unlock(); + dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl; + + ::lseek(fd, write_pos, SEEK_SET); + ::write(fd, bp.c_str(), size); + + write_lock.Lock(); + writing = false; + write_pos += bp.length(); + ebofs->queue_finishers(writingq); } - write_lock.Unlock(); dout(10) << "write_thread_entry finish" << dendl; } -bool DIOJournal::submit_entry(bufferlist& e, Context *oncommit) +bool DioJournal::is_full() { - assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. - - // ** lock ** Mutex::Locker locker(write_lock); + return full; +} - // wrap? full? - off_t size = 2*sizeof(entry_header_t) + e.length(); - - if (full) return false; // already marked full. +void DioJournal::submit_entry(bufferlist& e, Context *oncommit) +{ + Mutex::Locker locker(write_lock); // ** lock ** - if (header.wrap) { - // we're wrapped. don't overwrite ourselves. - if (queue_pos + size >= header.offset[0]) { - derr(0) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size - << " >= " << header.offset[0] - << dendl; - full = true; - print_header(); - return false; - } - } else { - // we haven't wrapped. - if (queue_pos + size >= header.max_size) { - // is there room if we wrap? - if ((off_t)sizeof(header_t) + size < header.offset[0]) { - // yes! - dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl; - header.wrap = queue_pos; - queue_pos = sizeof(header_t); - header.push(ebofs->get_super_epoch(), queue_pos); - } else { - // no room. - derr(0) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size - << " >= " << header.max_size - << dendl; - full = true; - return false; - } - } - } - - dout(10) << "submit_entry " << queue_pos << " : " << e.length() + // dump on queue + dout(10) << "submit_entry " << e.length() << " epoch " << ebofs->get_super_epoch() << " " << oncommit << dendl; - - // dump on queue - writeq.push_back(pair(ebofs->get_super_epoch(), e)); commitq.push_back(oncommit); - - queue_pos += size; - - // kick writer thread - write_cond.Signal(); - - return true; + if (!full) { + writeq.push_back(pair(ebofs->get_super_epoch(), e)); + write_cond.Signal(); // kick writer thread + } } -void DIOJournal::commit_epoch_start() +void DioJournal::commit_epoch_start() { dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 << " -- new epoch " << ebofs->get_super_epoch() @@ -346,71 +325,61 @@ void DIOJournal::commit_epoch_start() return; } - derr(0) << " clearing FULL flag, journal now usable" << dendl; + dout(1) << " clearing FULL flag, journal now usable" << dendl; full = false; } - - // note epoch boundary - header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. - //write_header(); // no need to write it now, though... } -void DIOJournal::commit_epoch_finish() +void DioJournal::commit_epoch_finish() { dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl; - write_lock.Lock(); - { - if (full) { - // full journal damage control. - dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; - header.clear(); - write_pos = queue_pos = sizeof(header_t); - } else { - // update header -- trim/discard old (committed) epochs - while (header.epoch[0] < ebofs->get_super_epoch()) - header.pop(); - } - write_header(); - - // discard any unwritten items in previous epoch, and do callbacks - epoch_t epoch = ebofs->get_super_epoch(); - list callbacks; - while (!writeq.empty() && writeq.front().first < epoch) { - dout(15) << " dropping unwritten and committed " - << write_pos << " : " << writeq.front().second.length() - << " epoch " << writeq.front().first - << dendl; - // finisher? - Context *oncommit = commitq.front(); - if (oncommit) callbacks.push_back(oncommit); - - write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); - - // discard. - writeq.pop_front(); - commitq.pop_front(); - } - - // queue the finishers - ebofs->queue_finishers(callbacks); - } - write_lock.Unlock(); - + Mutex::Locker locker(write_lock); + + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; + header.clear(); + write_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.num && header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } + write_header(); + + // discard any unwritten items in previous epoch + epoch_t epoch = ebofs->get_super_epoch(); + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << dendl; + // finisher? + Context *oncommit = commitq.front(); + if (oncommit) writingq.push_back(oncommit); + + // discard. + writeq.pop_front(); + commitq.pop_front(); + } + + // queue the finishers + ebofs->queue_finishers(writingq); } -void DIOJournal::make_writeable() +void DioJournal::make_writeable() { if (read_pos) - write_pos = queue_pos = read_pos; + write_pos = read_pos; else - write_pos = queue_pos = sizeof(header_t); + write_pos = sizeof(header_t); read_pos = 0; } -bool DIOJournal::read_entry(bufferlist& bl, epoch_t& epoch) +bool DioJournal::read_entry(bufferlist& bl, epoch_t& epoch) { if (!read_pos) { dout(2) << "read_entry -- not readable" << dendl; diff --git a/src/ebofs/DIOJournal.h b/src/ebofs/DIOJournal.h index 32e6e8e96ec16..38a72ccf8f74d 100644 --- a/src/ebofs/DIOJournal.h +++ b/src/ebofs/DIOJournal.h @@ -1,14 +1,14 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* - * Ceph - scalable distributed file system + * Ceph - scalable distributed dio system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. + * Foundation. See dio COPYING. * */ @@ -22,13 +22,13 @@ #include "common/Mutex.h" #include "common/Thread.h" -class DIOJournal : public Journal { +class DioJournal : public Journal { public: /** log header * we allow 4 pointers: * top/initial, * one for an epoch boundary (if any), - * one for a wrap in the ring buffer/journal file, + * one for a wrap in the ring buffer/journal dio, * one for a second epoch boundary (if any). * the epoch boundary one is useful only for speedier recovery in certain cases * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!) @@ -66,6 +66,9 @@ public: offset[num] = o; num++; } + epoch_t last_epoch() { + return epoch[num-1]; + } } header; struct entry_header_t { @@ -88,16 +91,18 @@ public: private: string fn; - bool full; + bool full, writing; off_t write_pos; // byte where next entry written goes - off_t queue_pos; // byte where next entry queued for write goes - off_t read_pos; // int fd; - list > writeq; // currently journaling - list commitq; // currently journaling + // to be journaled + list > writeq; + list commitq; + + // being journaled + list writingq; // write thread Mutex write_lock; @@ -112,9 +117,9 @@ private: void write_thread_entry(); class Writer : public Thread { - DIOJournal *journal; + DioJournal *journal; public: - Writer(DIOJournal *fj) : journal(fj) {} + Writer(DioJournal *fj) : journal(fj) {} void *entry() { journal->write_thread_entry(); return 0; @@ -122,13 +127,13 @@ private: } write_thread; public: - DIOJournal(Ebofs *e, char *f) : + DioJournal(Ebofs *e, const char *f) : Journal(e), fn(f), - full(false), - write_pos(0), queue_pos(0), read_pos(0), + full(false), writing(false), + write_pos(0), read_pos(0), fd(0), write_stop(false), write_thread(this) { } - ~DIOJournal() {} + ~DioJournal() {} int create(); int open(); @@ -137,12 +142,14 @@ private: void make_writeable(); // writes - bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void submit_entry(bufferlist& e, Context *oncommit); // submit an item void commit_epoch_start(); // mark epoch boundary void commit_epoch_finish(); // mark prior epoch as committed (we can expire) bool read_entry(bufferlist& bl, epoch_t& e); + bool is_full(); + // reads }; diff --git a/src/ebofs/Ebofs.cc b/src/ebofs/Ebofs.cc index 5d1e5f31a2618..e6410abc8430c 100644 --- a/src/ebofs/Ebofs.cc +++ b/src/ebofs/Ebofs.cc @@ -17,6 +17,7 @@ #include "Ebofs.h" #include "FileJournal.h" +#include "DIOJournal.h" #include @@ -125,6 +126,7 @@ int Ebofs::mount() // open journal? if (journalfn) { journal = new FileJournal(this, journalfn); + //journal = new DioJournal(this, journalfn); if (journal->open() < 0) { dout(3) << "mount journal " << journalfn << " open failed" << dendl; delete journal; diff --git a/src/ebofs/FileJournal.cc b/src/ebofs/FileJournal.cc index 24ec04785cbb2..82d82a0b8a6e9 100644 --- a/src/ebofs/FileJournal.cc +++ b/src/ebofs/FileJournal.cc @@ -276,7 +276,7 @@ void FileJournal::write_thread_entry() // write anything? if (bl.length() > 0) { writing = true; - write_lock.Unlock(); + //write_lock.Unlock(); dout(15) << "write_thread_entry writing " << write_pos << "~" << bl.length() << dendl; ::lseek(fd, write_pos, SEEK_SET); @@ -287,7 +287,7 @@ void FileJournal::write_thread_entry() ::write(fd, (char*)(*it).c_str(), (*it).length() ); } - write_lock.Lock(); + //write_lock.Lock(); writing = false; write_pos = queue_pos; ebofs->queue_finishers(writingq); -- 2.39.5