From 219710e1c00a7a4685e576ea1a141089d759a716 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 24 Jan 2008 16:33:32 -0800 Subject: [PATCH] roll directio into FileJournal --- src/Makefile | 3 +- src/ebofs/DIOJournal.cc | 438 --------------------------------------- src/ebofs/DIOJournal.h | 156 -------------- src/ebofs/Ebofs.cc | 1 - src/ebofs/FileJournal.cc | 141 ++++++++++++- src/ebofs/FileJournal.h | 18 +- 6 files changed, 147 insertions(+), 610 deletions(-) delete mode 100644 src/ebofs/DIOJournal.cc delete mode 100644 src/ebofs/DIOJournal.h diff --git a/src/Makefile b/src/Makefile index 023f5e144f0fd..6f383bcb49868 100644 --- a/src/Makefile +++ b/src/Makefile @@ -53,8 +53,7 @@ EBOFS_OBJS= \ ebofs/BufferCache.o\ ebofs/Ebofs.o\ ebofs/Allocator.o\ - ebofs/FileJournal.o\ - ebofs/DIOJournal.o + ebofs/FileJournal.o MDS_OBJS= \ mds/MDS.o\ diff --git a/src/ebofs/DIOJournal.cc b/src/ebofs/DIOJournal.cc deleted file mode 100644 index 9146abcb8fa79..0000000000000 --- a/src/ebofs/DIOJournal.cc +++ /dev/null @@ -1,438 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "DIOJournal.h" -#include "Ebofs.h" - -#include -#include -#include -#include - - -#include "config.h" - -#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " -#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " - - -int DioJournal::create() -{ - dout(2) << "create " << fn << dendl; - - // open/create - fd = ::open(fn.c_str(), O_RDWR|O_DIRECT); - if (fd < 0) { - dout(2) << "create failed " << errno << " " << strerror(errno) << dendl; - return -errno; - } - assert(fd > 0); - - //::ftruncate(fd, 0); - //::fchmod(fd, 0644); - - // get size - struct stat st; - ::fstat(fd, &st); - dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl; - - // write empty header - memset(&header, 0, sizeof(header)); - header.clear(); - header.fsid = ebofs->get_fsid(); - header.max_size = st.st_size; - write_header(); - - // writeable. - read_pos = 0; - write_pos = sizeof(header); - - ::close(fd); - - return 0; -} - -int DioJournal::open() -{ - //dout(1) << "open " << fn << dendl; - - // open and file - assert(fd == 0); - fd = ::open(fn.c_str(), O_RDWR|O_SYNC); - if (fd < 0) { - dout(2) << "open failed " << errno << " " << strerror(errno) << dendl; - return -errno; - } - assert(fd > 0); - - // assume writeable, unless... - read_pos = 0; - write_pos = sizeof(header); - - // read header? - read_header(); - if (header.fsid != ebofs->get_fsid()) { - dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl; - } - else if (header.num > 0) { - // valid header, pick an offset - for (int i=0; iget_super_epoch()) { - dout(2) << "using read_pos header pointer " - << header.epoch[i] << " at " << header.offset[i] - << dendl; - read_pos = header.offset[i]; - write_pos = 0; - break; - } - else if (header.epoch[i] < ebofs->get_super_epoch()) { - dout(2) << "super_epoch is " << ebofs->get_super_epoch() - << ", skipping old " << header.epoch[i] << " at " << header.offset[i] - << dendl; - } - else if (header.epoch[i] > ebofs->get_super_epoch()) { - dout(2) << "super_epoch is " << ebofs->get_super_epoch() - << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] - << dendl; - break; - } - } - } - - start_writer(); - - return 0; -} - -void DioJournal::close() -{ - dout(1) << "close " << fn << dendl; - - // stop writer thread - stop_writer(); - - // close - assert(writeq.empty()); - assert(commitq.empty()); - assert(fd > 0); - ::close(fd); - fd = 0; -} - -void DioJournal::start_writer() -{ - write_stop = false; - write_thread.create(); -} - -void DioJournal::stop_writer() -{ - write_lock.Lock(); - { - write_stop = true; - write_cond.Signal(); - } - write_lock.Unlock(); - write_thread.join(); -} - - -void DioJournal::print_header() -{ - for (int i=0; i header.last_epoch()) { - dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl; - header.push(epoch, write_pos); - } - - off_t size = 2*sizeof(entry_header_t) + ebl.length(); - size = DIV_ROUND_UP(size, 4096) * 4096; - - // does it fit? - if (header.wrap) { - // we're wrapped. don't overwrite ourselves. - if (write_pos + size >= header.offset[0]) { - dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size - << " >= " << header.offset[0] - << dendl; - full = true; - writeq.clear(); - print_header(); - continue; - } - } else { - // we haven't wrapped. - if (write_pos + size >= header.max_size) { - // is there room if we wrap? - if ((off_t)sizeof(header_t) + size < header.offset[0]) { - // yes! - dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl; - header.wrap = write_pos; - write_pos = sizeof(header_t); - header.push(ebofs->get_super_epoch(), write_pos); - write_header(); - } else { - // no room. - dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size - << " >= " << header.max_size - << dendl; - full = true; - writeq.clear(); - continue; - } - } - } - - // build it - bufferptr bp = buffer::create_page_aligned(size); - entry_header_t *h = (entry_header_t*)bp.c_str(); - ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t)); - - // add to write buffer - dout(15) << "write_thread_entry will write " << write_pos << " : " - << bp.length() - << " epoch " << epoch - << dendl; - - // add it this entry - h->epoch = epoch; - h->len = ebl.length(); - h->make_magic(write_pos, header.fsid); - memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h)); - - Context *oncommit = commitq.front(); - if (oncommit) - writingq.push_back(oncommit); - - // pop from writeq - writeq.pop_front(); - commitq.pop_front(); - - writing = true; - write_lock.Unlock(); - dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl; - - ::lseek(fd, write_pos, SEEK_SET); - ::write(fd, bp.c_str(), size); - - write_lock.Lock(); - writing = false; - write_pos += bp.length(); - ebofs->queue_finishers(writingq); - } - write_lock.Unlock(); - dout(10) << "write_thread_entry finish" << dendl; -} - -bool DioJournal::is_full() -{ - Mutex::Locker locker(write_lock); - return full; -} - -void DioJournal::submit_entry(bufferlist& e, Context *oncommit) -{ - Mutex::Locker locker(write_lock); // ** lock ** - - // dump on queue - dout(10) << "submit_entry " << e.length() - << " epoch " << ebofs->get_super_epoch() - << " " << oncommit << dendl; - commitq.push_back(oncommit); - if (!full) { - writeq.push_back(pair(ebofs->get_super_epoch(), e)); - write_cond.Signal(); // kick writer thread - } -} - - -void DioJournal::commit_epoch_start() -{ - dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 - << " -- new epoch " << ebofs->get_super_epoch() - << dendl; - - Mutex::Locker locker(write_lock); - - // was full -> empty -> now usable? - if (full) { - if (header.num != 0) { - dout(1) << " journal FULL, ignoring this epoch" << dendl; - return; - } - - dout(1) << " clearing FULL flag, journal now usable" << dendl; - full = false; - } -} - -void DioJournal::commit_epoch_finish() -{ - dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl; - - Mutex::Locker locker(write_lock); - - if (full) { - // full journal damage control. - dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; - header.clear(); - write_pos = sizeof(header_t); - } else { - // update header -- trim/discard old (committed) epochs - while (header.num && header.epoch[0] < ebofs->get_super_epoch()) - header.pop(); - } - write_header(); - - // discard any unwritten items in previous epoch - epoch_t epoch = ebofs->get_super_epoch(); - while (!writeq.empty() && writeq.front().first < epoch) { - dout(15) << " dropping unwritten and committed " - << write_pos << " : " << writeq.front().second.length() - << " epoch " << writeq.front().first - << dendl; - // finisher? - Context *oncommit = commitq.front(); - if (oncommit) writingq.push_back(oncommit); - - // discard. - writeq.pop_front(); - commitq.pop_front(); - } - - // queue the finishers - ebofs->queue_finishers(writingq); -} - - -void DioJournal::make_writeable() -{ - if (read_pos) - write_pos = read_pos; - else - write_pos = sizeof(header_t); - read_pos = 0; -} - - -bool DioJournal::read_entry(bufferlist& bl, epoch_t& epoch) -{ - if (!read_pos) { - dout(2) << "read_entry -- not readable" << dendl; - return false; - } - - if (read_pos == header.wrap) { - // find wrap point - for (int i=1; i - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See dio COPYING. - * - */ - - -#ifndef __EBOFS_DIOJOURNAL_H -#define __EBOFS_DIOJOURNAL_H - - -#include "Journal.h" -#include "common/Cond.h" -#include "common/Mutex.h" -#include "common/Thread.h" - -class DioJournal : public Journal { -public: - /** log header - * we allow 4 pointers: - * top/initial, - * one for an epoch boundary (if any), - * one for a wrap in the ring buffer/journal dio, - * one for a second epoch boundary (if any). - * the epoch boundary one is useful only for speedier recovery in certain cases - * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!) - */ - struct header_t { - uint64_t fsid; - int num; - off_t wrap; - off_t max_size; - epoch_t epoch[4]; - off_t offset[4]; - - header_t() : fsid(0), num(0), wrap(0), max_size(0) {} - - void clear() { - num = 0; - wrap = 0; - } - void pop() { - if (num >= 2 && offset[0] > offset[1]) - wrap = 0; // we're eliminating a wrap - num--; - for (int i=0; i 2 && - epoch[num-1] == e && - epoch[num-2] == (e-1)) - num--; // tail was an epoch boundary; replace it. - epoch[num] = e; - offset[num] = o; - num++; - } - epoch_t last_epoch() { - return epoch[num-1]; - } - } header; - - struct entry_header_t { - uint64_t epoch; - uint64_t len; - uint64_t magic1; - uint64_t magic2; - - void make_magic(off_t pos, uint64_t fsid) { - magic1 = pos; - magic2 = fsid ^ epoch ^ len; - } - bool check_magic(off_t pos, uint64_t fsid) { - return - magic1 == (uint64_t)pos && - magic2 == (fsid ^ epoch ^ len); - } - }; - -private: - string fn; - - bool full, writing; - off_t write_pos; // byte where next entry written goes - off_t read_pos; // - - int fd; - - // to be journaled - list > writeq; - list commitq; - - // being journaled - list writingq; - - // write thread - Mutex write_lock; - Cond write_cond; - bool write_stop; - - void print_header(); - void read_header(); - void write_header(); - void start_writer(); - void stop_writer(); - void write_thread_entry(); - - class Writer : public Thread { - DioJournal *journal; - public: - Writer(DioJournal *fj) : journal(fj) {} - void *entry() { - journal->write_thread_entry(); - return 0; - } - } write_thread; - - public: - DioJournal(Ebofs *e, const char *f) : - Journal(e), fn(f), - full(false), writing(false), - write_pos(0), read_pos(0), - fd(0), - write_stop(false), write_thread(this) { } - ~DioJournal() {} - - int create(); - int open(); - void close(); - - void make_writeable(); - - // writes - void submit_entry(bufferlist& e, Context *oncommit); // submit an item - void commit_epoch_start(); // mark epoch boundary - void commit_epoch_finish(); // mark prior epoch as committed (we can expire) - - bool read_entry(bufferlist& bl, epoch_t& e); - - bool is_full(); - - // reads -}; - -#endif diff --git a/src/ebofs/Ebofs.cc b/src/ebofs/Ebofs.cc index e6410abc8430c..176c263e3f3ec 100644 --- a/src/ebofs/Ebofs.cc +++ b/src/ebofs/Ebofs.cc @@ -17,7 +17,6 @@ #include "Ebofs.h" #include "FileJournal.h" -#include "DIOJournal.h" #include diff --git a/src/ebofs/FileJournal.cc b/src/ebofs/FileJournal.cc index 82d82a0b8a6e9..d2d26fb1de12f 100644 --- a/src/ebofs/FileJournal.cc +++ b/src/ebofs/FileJournal.cc @@ -45,7 +45,8 @@ int FileJournal::create() // get size struct stat st; ::fstat(fd, &st); - dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl; + block_size = st.st_blksize; + dout(2) << "create " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl; // write empty header memset(&header, 0, sizeof(header)); @@ -56,7 +57,7 @@ int FileJournal::create() // writeable. read_pos = 0; - write_pos = sizeof(header); + write_pos = get_top(); ::close(fd); @@ -78,7 +79,7 @@ int FileJournal::open() // assume writeable, unless... read_pos = 0; - write_pos = sizeof(header); + write_pos = get_top(); // read header? read_header(); @@ -161,21 +162,36 @@ void FileJournal::print_header() } void FileJournal::read_header() { + int r; dout(10) << "read_header" << dendl; - memset(&header, 0, sizeof(header)); // zero out (read may fail) ::lseek(fd, 0, SEEK_SET); - int r = ::read(fd, &header, sizeof(header)); + if (directio) { + buffer::ptr bp = buffer::create_page_aligned(block_size); + bp.zero(); + r = ::read(fd, bp.c_str(), bp.length()); + memcpy(&header, bp.c_str(), sizeof(header)); + } else { + memset(&header, 0, sizeof(header)); // zero out (read may fail) + r = ::read(fd, &header, sizeof(header)); + } if (r < 0) dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl; print_header(); } void FileJournal::write_header() { + int r; dout(10) << "write_header " << dendl; print_header(); - ::lseek(fd, 0, SEEK_SET); - int r = ::write(fd, &header, sizeof(header)); + if (directio) { + buffer::ptr bp = buffer::create_page_aligned(block_size); + bp.zero(); + memcpy(bp.c_str(), &header, sizeof(header)); + r = ::write(fd, bp.c_str(), bp.length()); + } else { + r = ::write(fd, &header, sizeof(header)); + } if (r < 0) dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl; } @@ -231,9 +247,9 @@ void FileJournal::write_thread_entry() // is there room if we wrap? if ((off_t)sizeof(header_t) + size < header.offset[0]) { // yes! - dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl; + dout(10) << "wrapped from " << queue_pos << " to " << get_top() << dendl; header.wrap = queue_pos; - queue_pos = sizeof(header_t); + queue_pos = get_top(); header.push(ebofs->get_super_epoch(), queue_pos); write_header(); } else { @@ -297,6 +313,109 @@ void FileJournal::write_thread_entry() dout(10) << "write_thread_entry finish" << dendl; } +void FileJournal::write_thread_entry_dio() +{ + dout(10) << "write_thread_entry start" << dendl; + write_lock.Lock(); + + while (!write_stop) { + if (writeq.empty()) { + // sleep + dout(20) << "write_thread_entry going to sleep" << dendl; + write_cond.Wait(write_lock); + dout(20) << "write_thread_entry woke up" << dendl; + continue; + } + + // grab next item + epoch_t epoch = writeq.front().first; + bufferlist &ebl = writeq.front().second; + + // epoch boundary? + if (epoch > header.last_epoch()) { + dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl; + header.push(epoch, write_pos); + } + + off_t size = 2*sizeof(entry_header_t) + ebl.length(); + size = DIV_ROUND_UP(size, 4096) * 4096; + + // does it fit? + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (write_pos + size >= header.offset[0]) { + dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size + << " >= " << header.offset[0] + << dendl; + full = true; + writeq.clear(); + print_header(); + continue; + } + } else { + // we haven't wrapped. + if (write_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl; + header.wrap = write_pos; + write_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), write_pos); + write_header(); + } else { + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size + << " >= " << header.max_size + << dendl; + full = true; + writeq.clear(); + continue; + } + } + } + + // build it + bufferptr bp = buffer::create_page_aligned(size); + entry_header_t *h = (entry_header_t*)bp.c_str(); + ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t)); + + // add to write buffer + dout(15) << "write_thread_entry will write " << write_pos << " : " + << bp.length() + << " epoch " << epoch + << dendl; + + // add it this entry + h->epoch = epoch; + h->len = ebl.length(); + h->make_magic(write_pos, header.fsid); + memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h)); + + Context *oncommit = commitq.front(); + if (oncommit) + writingq.push_back(oncommit); + + // pop from writeq + writeq.pop_front(); + commitq.pop_front(); + + writing = true; + write_lock.Unlock(); + dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl; + + ::lseek(fd, write_pos, SEEK_SET); + ::write(fd, bp.c_str(), size); + + write_lock.Lock(); + writing = false; + write_pos += bp.length(); + ebofs->queue_finishers(writingq); + } + write_lock.Unlock(); + dout(10) << "write_thread_entry finish" << dendl; +} + bool FileJournal::is_full() { Mutex::Locker locker(write_lock); @@ -349,7 +468,7 @@ void FileJournal::commit_epoch_finish() // full journal damage control. dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; header.clear(); - write_pos = sizeof(header_t); + write_pos = get_top(); } else { // update header -- trim/discard old (committed) epochs while (header.num && header.epoch[0] < ebofs->get_super_epoch()) @@ -383,7 +502,7 @@ void FileJournal::make_writeable() if (read_pos) write_pos = read_pos; else - write_pos = sizeof(header_t); + write_pos = get_top(); read_pos = 0; } diff --git a/src/ebofs/FileJournal.h b/src/ebofs/FileJournal.h index 84b3a603f74a8..050b6f251be5f 100644 --- a/src/ebofs/FileJournal.h +++ b/src/ebofs/FileJournal.h @@ -91,6 +91,8 @@ public: private: string fn; + size_t block_size; + bool directio; bool full, writing; off_t write_pos; // byte where next entry written goes off_t read_pos; // @@ -115,20 +117,32 @@ private: void start_writer(); void stop_writer(); void write_thread_entry(); + void write_thread_entry_dio(); class Writer : public Thread { FileJournal *journal; public: Writer(FileJournal *fj) : journal(fj) {} void *entry() { - journal->write_thread_entry(); + if (journal->directio) + journal->write_thread_entry_dio(); + else + journal->write_thread_entry(); return 0; } } write_thread; + off_t get_top() { + if (directio) + return block_size; + else + return sizeof(header); + } + public: - FileJournal(Ebofs *e, const char *f) : + FileJournal(Ebofs *e, const char *f, bool dio=false) : Journal(e), fn(f), + directio(dio), full(false), writing(false), write_pos(0), read_pos(0), fd(0), -- 2.39.5