+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "DIOJournal.h"
-#include "Ebofs.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-
-#include "config.h"
-
-#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-
-
-int DioJournal::create()
-{
- dout(2) << "create " << fn << dendl;
-
- // open/create
- fd = ::open(fn.c_str(), O_RDWR|O_DIRECT);
- if (fd < 0) {
- dout(2) << "create failed " << errno << " " << strerror(errno) << dendl;
- return -errno;
- }
- assert(fd > 0);
-
- //::ftruncate(fd, 0);
- //::fchmod(fd, 0644);
-
- // get size
- struct stat st;
- ::fstat(fd, &st);
- dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
-
- // write empty header
- memset(&header, 0, sizeof(header));
- header.clear();
- header.fsid = ebofs->get_fsid();
- header.max_size = st.st_size;
- write_header();
-
- // writeable.
- read_pos = 0;
- write_pos = sizeof(header);
-
- ::close(fd);
-
- return 0;
-}
-
-int DioJournal::open()
-{
- //dout(1) << "open " << fn << dendl;
-
- // open and file
- assert(fd == 0);
- fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
- if (fd < 0) {
- dout(2) << "open failed " << errno << " " << strerror(errno) << dendl;
- return -errno;
- }
- assert(fd > 0);
-
- // assume writeable, unless...
- read_pos = 0;
- write_pos = sizeof(header);
-
- // read header?
- read_header();
- if (header.fsid != ebofs->get_fsid()) {
- dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
- }
- else if (header.num > 0) {
- // valid header, pick an offset
- for (int i=0; i<header.num; i++) {
- if (header.epoch[i] == ebofs->get_super_epoch()) {
- dout(2) << "using read_pos header pointer "
- << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- read_pos = header.offset[i];
- write_pos = 0;
- break;
- }
- else if (header.epoch[i] < ebofs->get_super_epoch()) {
- dout(2) << "super_epoch is " << ebofs->get_super_epoch()
- << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- }
- else if (header.epoch[i] > ebofs->get_super_epoch()) {
- dout(2) << "super_epoch is " << ebofs->get_super_epoch()
- << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
- << dendl;
- break;
- }
- }
- }
-
- start_writer();
-
- return 0;
-}
-
-void DioJournal::close()
-{
- dout(1) << "close " << fn << dendl;
-
- // stop writer thread
- stop_writer();
-
- // close
- assert(writeq.empty());
- assert(commitq.empty());
- assert(fd > 0);
- ::close(fd);
- fd = 0;
-}
-
-void DioJournal::start_writer()
-{
- write_stop = false;
- write_thread.create();
-}
-
-void DioJournal::stop_writer()
-{
- write_lock.Lock();
- {
- write_stop = true;
- write_cond.Signal();
- }
- write_lock.Unlock();
- write_thread.join();
-}
-
-
-void DioJournal::print_header()
-{
- for (int i=0; i<header.num; i++) {
- if (i && header.offset[i] < header.offset[i-1]) {
- assert(header.wrap);
- dout(10) << "header: wrap at " << header.wrap << dendl;
- }
- dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << dendl;
- }
- //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
-}
-void DioJournal::read_header()
-{
- dout(10) << "read_header" << dendl;
- char buf[4096];
- memset(buf, 0, 4096); // zero out (read may fail)
- ::lseek(fd, 0, SEEK_SET);
- int r = ::read(fd, buf, 4096);
- memcpy(&header, buf, sizeof(header));
- if (r < 0)
- dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
- print_header();
-}
-void DioJournal::write_header()
-{
- dout(10) << "write_header " << dendl;
- print_header();
-
- ::lseek(fd, 0, SEEK_SET);
- char buf[4096];
- memcpy(buf, &header, sizeof(header));
- int r = ::write(fd, buf, 4096);
- if (r < 0)
- dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
-}
-
-
-void DioJournal::write_thread_entry()
-{
- dout(10) << "write_thread_entry start" << dendl;
- write_lock.Lock();
-
- while (!write_stop) {
- if (writeq.empty()) {
- // sleep
- dout(20) << "write_thread_entry going to sleep" << dendl;
- write_cond.Wait(write_lock);
- dout(20) << "write_thread_entry woke up" << dendl;
- continue;
- }
-
- // grab next item
- epoch_t epoch = writeq.front().first;
- bufferlist &ebl = writeq.front().second;
-
- // epoch boundary?
- if (epoch > header.last_epoch()) {
- dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
- header.push(epoch, write_pos);
- }
-
- off_t size = 2*sizeof(entry_header_t) + ebl.length();
- size = DIV_ROUND_UP(size, 4096) * 4096;
-
- // does it fit?
- if (header.wrap) {
- // we're wrapped. don't overwrite ourselves.
- if (write_pos + size >= header.offset[0]) {
- dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size
- << " >= " << header.offset[0]
- << dendl;
- full = true;
- writeq.clear();
- print_header();
- continue;
- }
- } else {
- // we haven't wrapped.
- if (write_pos + size >= header.max_size) {
- // is there room if we wrap?
- if ((off_t)sizeof(header_t) + size < header.offset[0]) {
- // yes!
- dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl;
- header.wrap = write_pos;
- write_pos = sizeof(header_t);
- header.push(ebofs->get_super_epoch(), write_pos);
- write_header();
- } else {
- // no room.
- dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size
- << " >= " << header.max_size
- << dendl;
- full = true;
- writeq.clear();
- continue;
- }
- }
- }
-
- // build it
- bufferptr bp = buffer::create_page_aligned(size);
- entry_header_t *h = (entry_header_t*)bp.c_str();
- ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t));
-
- // add to write buffer
- dout(15) << "write_thread_entry will write " << write_pos << " : "
- << bp.length()
- << " epoch " << epoch
- << dendl;
-
- // add it this entry
- h->epoch = epoch;
- h->len = ebl.length();
- h->make_magic(write_pos, header.fsid);
- memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
-
- Context *oncommit = commitq.front();
- if (oncommit)
- writingq.push_back(oncommit);
-
- // pop from writeq
- writeq.pop_front();
- commitq.pop_front();
-
- writing = true;
- write_lock.Unlock();
- dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl;
-
- ::lseek(fd, write_pos, SEEK_SET);
- ::write(fd, bp.c_str(), size);
-
- write_lock.Lock();
- writing = false;
- write_pos += bp.length();
- ebofs->queue_finishers(writingq);
- }
- write_lock.Unlock();
- dout(10) << "write_thread_entry finish" << dendl;
-}
-
-bool DioJournal::is_full()
-{
- Mutex::Locker locker(write_lock);
- return full;
-}
-
-void DioJournal::submit_entry(bufferlist& e, Context *oncommit)
-{
- Mutex::Locker locker(write_lock); // ** lock **
-
- // dump on queue
- dout(10) << "submit_entry " << e.length()
- << " epoch " << ebofs->get_super_epoch()
- << " " << oncommit << dendl;
- commitq.push_back(oncommit);
- if (!full) {
- writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
- write_cond.Signal(); // kick writer thread
- }
-}
-
-
-void DioJournal::commit_epoch_start()
-{
- dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
- << " -- new epoch " << ebofs->get_super_epoch()
- << dendl;
-
- Mutex::Locker locker(write_lock);
-
- // was full -> empty -> now usable?
- if (full) {
- if (header.num != 0) {
- dout(1) << " journal FULL, ignoring this epoch" << dendl;
- return;
- }
-
- dout(1) << " clearing FULL flag, journal now usable" << dendl;
- full = false;
- }
-}
-
-void DioJournal::commit_epoch_finish()
-{
- dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
-
- Mutex::Locker locker(write_lock);
-
- if (full) {
- // full journal damage control.
- dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
- header.clear();
- write_pos = sizeof(header_t);
- } else {
- // update header -- trim/discard old (committed) epochs
- while (header.num && header.epoch[0] < ebofs->get_super_epoch())
- header.pop();
- }
- write_header();
-
- // discard any unwritten items in previous epoch
- epoch_t epoch = ebofs->get_super_epoch();
- while (!writeq.empty() && writeq.front().first < epoch) {
- dout(15) << " dropping unwritten and committed "
- << write_pos << " : " << writeq.front().second.length()
- << " epoch " << writeq.front().first
- << dendl;
- // finisher?
- Context *oncommit = commitq.front();
- if (oncommit) writingq.push_back(oncommit);
-
- // discard.
- writeq.pop_front();
- commitq.pop_front();
- }
-
- // queue the finishers
- ebofs->queue_finishers(writingq);
-}
-
-
-void DioJournal::make_writeable()
-{
- if (read_pos)
- write_pos = read_pos;
- else
- write_pos = sizeof(header_t);
- read_pos = 0;
-}
-
-
-bool DioJournal::read_entry(bufferlist& bl, epoch_t& epoch)
-{
- if (!read_pos) {
- dout(2) << "read_entry -- not readable" << dendl;
- return false;
- }
-
- if (read_pos == header.wrap) {
- // find wrap point
- for (int i=1; i<header.num; i++) {
- if (header.offset[i] < read_pos) {
- assert(header.offset[i-1] < read_pos);
- read_pos = header.offset[i];
- break;
- }
- }
- assert(read_pos != header.wrap);
- dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << dendl;
- }
-
- // header
- entry_header_t h;
- ::lseek(fd, read_pos, SEEK_SET);
- ::read(fd, &h, sizeof(h));
- if (!h.check_magic(read_pos, header.fsid)) {
- dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
- return false;
- }
-
- // body
- bufferptr bp(h.len);
- ::read(fd, bp.c_str(), h.len);
-
- // footer
- entry_header_t f;
- ::read(fd, &f, sizeof(h));
- if (!f.check_magic(read_pos, header.fsid) ||
- h.epoch != f.epoch ||
- h.len != f.len) {
- dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
- return false;
- }
-
-
- // yay!
- dout(1) << "read_entry " << read_pos << " : "
- << " " << h.len << " bytes"
- << " epoch " << h.epoch
- << dendl;
-
- bl.push_back(bp);
- epoch = h.epoch;
-
- read_pos += 2*sizeof(entry_header_t) + h.len;
-
- return true;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed dio system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See dio COPYING.
- *
- */
-
-
-#ifndef __EBOFS_DIOJOURNAL_H
-#define __EBOFS_DIOJOURNAL_H
-
-
-#include "Journal.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/Thread.h"
-
-class DioJournal : public Journal {
-public:
- /** log header
- * we allow 4 pointers:
- * top/initial,
- * one for an epoch boundary (if any),
- * one for a wrap in the ring buffer/journal dio,
- * one for a second epoch boundary (if any).
- * the epoch boundary one is useful only for speedier recovery in certain cases
- * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
- */
- struct header_t {
- uint64_t fsid;
- int num;
- off_t wrap;
- off_t max_size;
- epoch_t epoch[4];
- off_t offset[4];
-
- header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
-
- void clear() {
- num = 0;
- wrap = 0;
- }
- void pop() {
- if (num >= 2 && offset[0] > offset[1])
- wrap = 0; // we're eliminating a wrap
- num--;
- for (int i=0; i<num; i++) {
- epoch[i] = epoch[i+1];
- offset[i] = offset[i+1];
- }
- }
- void push(epoch_t e, off_t o) {
- assert(num < 4);
- if (num > 2 &&
- epoch[num-1] == e &&
- epoch[num-2] == (e-1))
- num--; // tail was an epoch boundary; replace it.
- epoch[num] = e;
- offset[num] = o;
- num++;
- }
- epoch_t last_epoch() {
- return epoch[num-1];
- }
- } header;
-
- struct entry_header_t {
- uint64_t epoch;
- uint64_t len;
- uint64_t magic1;
- uint64_t magic2;
-
- void make_magic(off_t pos, uint64_t fsid) {
- magic1 = pos;
- magic2 = fsid ^ epoch ^ len;
- }
- bool check_magic(off_t pos, uint64_t fsid) {
- return
- magic1 == (uint64_t)pos &&
- magic2 == (fsid ^ epoch ^ len);
- }
- };
-
-private:
- string fn;
-
- bool full, writing;
- off_t write_pos; // byte where next entry written goes
- off_t read_pos; //
-
- int fd;
-
- // to be journaled
- list<pair<epoch_t,bufferlist> > writeq;
- list<Context*> commitq;
-
- // being journaled
- list<Context*> writingq;
-
- // write thread
- Mutex write_lock;
- Cond write_cond;
- bool write_stop;
-
- void print_header();
- void read_header();
- void write_header();
- void start_writer();
- void stop_writer();
- void write_thread_entry();
-
- class Writer : public Thread {
- DioJournal *journal;
- public:
- Writer(DioJournal *fj) : journal(fj) {}
- void *entry() {
- journal->write_thread_entry();
- return 0;
- }
- } write_thread;
-
- public:
- DioJournal(Ebofs *e, const char *f) :
- Journal(e), fn(f),
- full(false), writing(false),
- write_pos(0), read_pos(0),
- fd(0),
- write_stop(false), write_thread(this) { }
- ~DioJournal() {}
-
- int create();
- int open();
- void close();
-
- void make_writeable();
-
- // writes
- void submit_entry(bufferlist& e, Context *oncommit); // submit an item
- void commit_epoch_start(); // mark epoch boundary
- void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
-
- bool read_entry(bufferlist& bl, epoch_t& e);
-
- bool is_full();
-
- // reads
-};
-
-#endif
// get size
struct stat st;
::fstat(fd, &st);
- dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl;
+ block_size = st.st_blksize;
+ dout(2) << "create " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
// write empty header
memset(&header, 0, sizeof(header));
// writeable.
read_pos = 0;
- write_pos = sizeof(header);
+ write_pos = get_top();
::close(fd);
// assume writeable, unless...
read_pos = 0;
- write_pos = sizeof(header);
+ write_pos = get_top();
// read header?
read_header();
}
void FileJournal::read_header()
{
+ int r;
dout(10) << "read_header" << dendl;
- memset(&header, 0, sizeof(header)); // zero out (read may fail)
::lseek(fd, 0, SEEK_SET);
- int r = ::read(fd, &header, sizeof(header));
+ if (directio) {
+ buffer::ptr bp = buffer::create_page_aligned(block_size);
+ bp.zero();
+ r = ::read(fd, bp.c_str(), bp.length());
+ memcpy(&header, bp.c_str(), sizeof(header));
+ } else {
+ memset(&header, 0, sizeof(header)); // zero out (read may fail)
+ r = ::read(fd, &header, sizeof(header));
+ }
if (r < 0)
dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
print_header();
}
void FileJournal::write_header()
{
+ int r;
dout(10) << "write_header " << dendl;
print_header();
-
::lseek(fd, 0, SEEK_SET);
- int r = ::write(fd, &header, sizeof(header));
+ if (directio) {
+ buffer::ptr bp = buffer::create_page_aligned(block_size);
+ bp.zero();
+ memcpy(bp.c_str(), &header, sizeof(header));
+ r = ::write(fd, bp.c_str(), bp.length());
+ } else {
+ r = ::write(fd, &header, sizeof(header));
+ }
if (r < 0)
dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
}
// is there room if we wrap?
if ((off_t)sizeof(header_t) + size < header.offset[0]) {
// yes!
- dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl;
+ dout(10) << "wrapped from " << queue_pos << " to " << get_top() << dendl;
header.wrap = queue_pos;
- queue_pos = sizeof(header_t);
+ queue_pos = get_top();
header.push(ebofs->get_super_epoch(), queue_pos);
write_header();
} else {
dout(10) << "write_thread_entry finish" << dendl;
}
+void FileJournal::write_thread_entry_dio()
+{
+ dout(10) << "write_thread_entry start" << dendl;
+ write_lock.Lock();
+
+ while (!write_stop) {
+ if (writeq.empty()) {
+ // sleep
+ dout(20) << "write_thread_entry going to sleep" << dendl;
+ write_cond.Wait(write_lock);
+ dout(20) << "write_thread_entry woke up" << dendl;
+ continue;
+ }
+
+ // grab next item
+ epoch_t epoch = writeq.front().first;
+ bufferlist &ebl = writeq.front().second;
+
+ // epoch boundary?
+ if (epoch > header.last_epoch()) {
+ dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
+ header.push(epoch, write_pos);
+ }
+
+ off_t size = 2*sizeof(entry_header_t) + ebl.length();
+ size = DIV_ROUND_UP(size, 4096) * 4096;
+
+ // does it fit?
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (write_pos + size >= header.offset[0]) {
+ dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size
+ << " >= " << header.offset[0]
+ << dendl;
+ full = true;
+ writeq.clear();
+ print_header();
+ continue;
+ }
+ } else {
+ // we haven't wrapped.
+ if (write_pos + size >= header.max_size) {
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl;
+ header.wrap = write_pos;
+ write_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), write_pos);
+ write_header();
+ } else {
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size
+ << " >= " << header.max_size
+ << dendl;
+ full = true;
+ writeq.clear();
+ continue;
+ }
+ }
+ }
+
+ // build it
+ bufferptr bp = buffer::create_page_aligned(size);
+ entry_header_t *h = (entry_header_t*)bp.c_str();
+ ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t));
+
+ // add to write buffer
+ dout(15) << "write_thread_entry will write " << write_pos << " : "
+ << bp.length()
+ << " epoch " << epoch
+ << dendl;
+
+ // add it this entry
+ h->epoch = epoch;
+ h->len = ebl.length();
+ h->make_magic(write_pos, header.fsid);
+ memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
+
+ Context *oncommit = commitq.front();
+ if (oncommit)
+ writingq.push_back(oncommit);
+
+ // pop from writeq
+ writeq.pop_front();
+ commitq.pop_front();
+
+ writing = true;
+ write_lock.Unlock();
+ dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl;
+
+ ::lseek(fd, write_pos, SEEK_SET);
+ ::write(fd, bp.c_str(), size);
+
+ write_lock.Lock();
+ writing = false;
+ write_pos += bp.length();
+ ebofs->queue_finishers(writingq);
+ }
+ write_lock.Unlock();
+ dout(10) << "write_thread_entry finish" << dendl;
+}
+
bool FileJournal::is_full()
{
Mutex::Locker locker(write_lock);
// full journal damage control.
dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
header.clear();
- write_pos = sizeof(header_t);
+ write_pos = get_top();
} else {
// update header -- trim/discard old (committed) epochs
while (header.num && header.epoch[0] < ebofs->get_super_epoch())
if (read_pos)
write_pos = read_pos;
else
- write_pos = sizeof(header_t);
+ write_pos = get_top();
read_pos = 0;
}