From 8a104be73bc3d95875fa5d7193cb5db22b871756 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 24 Jan 2008 01:47:03 -0800 Subject: [PATCH] simplified FileJournal, journal interface --- src/ebofs/DIOJournal.cc | 469 +++++++++++++++++++++++++++++++++++++++ src/ebofs/DIOJournal.h | 149 +++++++++++++ src/ebofs/Ebofs.cc | 300 +++++++++++-------------- src/ebofs/Ebofs.h | 16 +- src/ebofs/FileJournal.cc | 261 +++++++++++----------- src/ebofs/FileJournal.h | 25 ++- src/ebofs/Journal.h | 3 +- src/ebofs/Onode.h | 2 +- src/ebofs/streamtest.cc | 13 +- 9 files changed, 911 insertions(+), 327 deletions(-) create mode 100644 src/ebofs/DIOJournal.cc create mode 100644 src/ebofs/DIOJournal.h diff --git a/src/ebofs/DIOJournal.cc b/src/ebofs/DIOJournal.cc new file mode 100644 index 0000000000000..75eb408215454 --- /dev/null +++ b/src/ebofs/DIOJournal.cc @@ -0,0 +1,469 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "DIOJournal.h" +#include "Ebofs.h" + +#include +#include +#include +#include + + +#include "config.h" + +#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " +#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal " + + +int DIOJournal::create() +{ + dout(2) << "create " << fn << dendl; + + // open/create + fd = ::open(fn.c_str(), O_RDWR|O_DIRECT); + if (fd < 0) { + dout(2) << "create failed " << errno << " " << strerror(errno) << dendl; + return -errno; + } + assert(fd > 0); + + //::ftruncate(fd, 0); + //::fchmod(fd, 0644); + + // get size + struct stat st; + ::fstat(fd, &st); + dout(2) << "create " << fn << " " << st.st_size << " bytes" << dendl; + + // write empty header + memset(&header, 0, sizeof(header)); + header.clear(); + header.fsid = ebofs->get_fsid(); + header.max_size = st.st_size; + write_header(); + + // writeable. + read_pos = 0; + write_pos = queue_pos = sizeof(header); + + ::close(fd); + + return 0; +} + +int DIOJournal::open() +{ + //dout(1) << "open " << fn << dendl; + + // open and file + assert(fd == 0); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(2) << "open failed " << errno << " " << strerror(errno) << dendl; + return -errno; + } + assert(fd > 0); + + // assume writeable, unless... + read_pos = 0; + write_pos = queue_pos = sizeof(header); + + // read header? + read_header(); + if (header.fsid != ebofs->get_fsid()) { + dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl; + } + else if (header.num > 0) { + // valid header, pick an offset + for (int i=0; iget_super_epoch()) { + dout(2) << "using read_pos header pointer " + << header.epoch[i] << " at " << header.offset[i] + << dendl; + read_pos = header.offset[i]; + write_pos = queue_pos = 0; + break; + } + else if (header.epoch[i] < ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", skipping old " << header.epoch[i] << " at " << header.offset[i] + << dendl; + } + else if (header.epoch[i] > ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] + << dendl; + break; + } + } + } + + start_writer(); + + return 0; +} + +void DIOJournal::close() +{ + dout(1) << "close " << fn << dendl; + + // stop writer thread + stop_writer(); + + // close + assert(writeq.empty()); + assert(commitq.empty()); + assert(fd > 0); + ::close(fd); + fd = 0; +} + +void DIOJournal::start_writer() +{ + write_stop = false; + write_thread.create(); +} + +void DIOJournal::stop_writer() +{ + write_lock.Lock(); + { + write_stop = true; + write_cond.Signal(); + } + write_lock.Unlock(); + write_thread.join(); +} + + +void DIOJournal::print_header() +{ + for (int i=0; iepoch = epoch; + h->len = len;//bl.length(); + h->make_magic(write_pos, header.fsid); + + ::lseek(fd, write_pos, SEEK_SET); + /* + ::write(fd, &h, sizeof(h)); + for (list::const_iterator it = bl.buffers().begin(); + it != bl.buffers().end(); + it++) { + if ((*it).length() == 0) continue; // blank buffer. + ::write(fd, (char*)(*it).c_str(), (*it).length() ); + } + ::write(fd, &h, sizeof(h)); + ::fsync(fd); + */ + int r = ::write(fd, bp.c_str(), dolen); + if (r != dolen) derr(0) << "write got " << r << " not " << dolen << dendl; + + // move position pointer + write_pos += dolen; + + if (oncommit) { + if (1) { + // queue callback + ebofs->queue_finisher(oncommit); + } else { + // callback now + oncommit->finish(0); + delete oncommit; + } + } + } + } + + write_lock.Unlock(); + dout(10) << "write_thread_entry finish" << dendl; +} + +bool DIOJournal::submit_entry(bufferlist& e, Context *oncommit) +{ + assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. + + // ** lock ** + Mutex::Locker locker(write_lock); + + // wrap? full? + off_t size = 2*sizeof(entry_header_t) + e.length(); + + if (full) return false; // already marked full. + + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (queue_pos + size >= header.offset[0]) { + derr(0) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size + << " >= " << header.offset[0] + << dendl; + full = true; + print_header(); + return false; + } + } else { + // we haven't wrapped. + if (queue_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl; + header.wrap = queue_pos; + queue_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), queue_pos); + } else { + // no room. + derr(0) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size + << " >= " << header.max_size + << dendl; + full = true; + return false; + } + } + } + + dout(10) << "submit_entry " << queue_pos << " : " << e.length() + << " epoch " << ebofs->get_super_epoch() + << " " << oncommit << dendl; + + // dump on queue + writeq.push_back(pair(ebofs->get_super_epoch(), e)); + commitq.push_back(oncommit); + + queue_pos += size; + + // kick writer thread + write_cond.Signal(); + + return true; +} + + +void DIOJournal::commit_epoch_start() +{ + dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 + << " -- new epoch " << ebofs->get_super_epoch() + << dendl; + + Mutex::Locker locker(write_lock); + + // was full -> empty -> now usable? + if (full) { + if (header.num != 0) { + dout(1) << " journal FULL, ignoring this epoch" << dendl; + return; + } + + derr(0) << " clearing FULL flag, journal now usable" << dendl; + full = false; + } + + // note epoch boundary + header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. + //write_header(); // no need to write it now, though... +} + +void DIOJournal::commit_epoch_finish() +{ + dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl; + + write_lock.Lock(); + { + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; + header.clear(); + write_pos = queue_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } + write_header(); + + // discard any unwritten items in previous epoch, and do callbacks + epoch_t epoch = ebofs->get_super_epoch(); + list callbacks; + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << dendl; + // finisher? + Context *oncommit = commitq.front(); + if (oncommit) callbacks.push_back(oncommit); + + write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); + + // discard. + writeq.pop_front(); + commitq.pop_front(); + } + + // queue the finishers + ebofs->queue_finishers(callbacks); + } + write_lock.Unlock(); + +} + + +void DIOJournal::make_writeable() +{ + if (read_pos) + write_pos = queue_pos = read_pos; + else + write_pos = queue_pos = sizeof(header_t); + read_pos = 0; +} + + +bool DIOJournal::read_entry(bufferlist& bl, epoch_t& epoch) +{ + if (!read_pos) { + dout(2) << "read_entry -- not readable" << dendl; + return false; + } + + if (read_pos == header.wrap) { + // find wrap point + for (int i=1; i + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __EBOFS_DIOJOURNAL_H +#define __EBOFS_DIOJOURNAL_H + + +#include "Journal.h" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/Thread.h" + +class DIOJournal : public Journal { +public: + /** log header + * we allow 4 pointers: + * top/initial, + * one for an epoch boundary (if any), + * one for a wrap in the ring buffer/journal file, + * one for a second epoch boundary (if any). + * the epoch boundary one is useful only for speedier recovery in certain cases + * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!) + */ + struct header_t { + uint64_t fsid; + int num; + off_t wrap; + off_t max_size; + epoch_t epoch[4]; + off_t offset[4]; + + header_t() : fsid(0), num(0), wrap(0), max_size(0) {} + + void clear() { + num = 0; + wrap = 0; + } + void pop() { + if (num >= 2 && offset[0] > offset[1]) + wrap = 0; // we're eliminating a wrap + num--; + for (int i=0; i 2 && + epoch[num-1] == e && + epoch[num-2] == (e-1)) + num--; // tail was an epoch boundary; replace it. + epoch[num] = e; + offset[num] = o; + num++; + } + } header; + + struct entry_header_t { + uint64_t epoch; + uint64_t len; + uint64_t magic1; + uint64_t magic2; + + void make_magic(off_t pos, uint64_t fsid) { + magic1 = pos; + magic2 = fsid ^ epoch ^ len; + } + bool check_magic(off_t pos, uint64_t fsid) { + return + magic1 == (uint64_t)pos && + magic2 == (fsid ^ epoch ^ len); + } + }; + +private: + string fn; + + bool full; + off_t write_pos; // byte where next entry written goes + off_t queue_pos; // byte where next entry queued for write goes + + off_t read_pos; // + + int fd; + + list > writeq; // currently journaling + list commitq; // currently journaling + + // write thread + Mutex write_lock; + Cond write_cond; + bool write_stop; + + void print_header(); + void read_header(); + void write_header(); + void start_writer(); + void stop_writer(); + void write_thread_entry(); + + class Writer : public Thread { + DIOJournal *journal; + public: + Writer(DIOJournal *fj) : journal(fj) {} + void *entry() { + journal->write_thread_entry(); + return 0; + } + } write_thread; + + public: + DIOJournal(Ebofs *e, char *f) : + Journal(e), fn(f), + full(false), + write_pos(0), queue_pos(0), read_pos(0), + fd(0), + write_stop(false), write_thread(this) { } + ~DIOJournal() {} + + int create(); + int open(); + void close(); + + void make_writeable(); + + // writes + bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void commit_epoch_start(); // mark epoch boundary + void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + + bool read_entry(bufferlist& bl, epoch_t& e); + + // reads +}; + +#endif diff --git a/src/ebofs/Ebofs.cc b/src/ebofs/Ebofs.cc index 9376a4f8f9539..5d1e5f31a2618 100644 --- a/src/ebofs/Ebofs.cc +++ b/src/ebofs/Ebofs.cc @@ -1432,17 +1432,14 @@ void Ebofs::sync(Context *onsafe) if (onsafe) { dirty = true; - while (1) { - if (journal) { - // journal empty transaction - Transaction t; - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + // journal empty transaction + Transaction t; + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } ebofs_lock.Unlock(); } @@ -2483,15 +2480,12 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) delete onsafe; // kill callback, but still journal below (in case transaction had side effects) onsafe = 0; } - while (1) { - if (journal) { - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); ebofs_lock.Unlock(); return r; @@ -2900,17 +2894,14 @@ int Ebofs::write(pobject_t oid, // commit waiter if (r > 0) { assert((size_t)r == len); - while (1) { - if (journal) { - Transaction t; - t.write(oid, off, len, bl); - bufferlist tbl; - t._encode(tbl); - if (journal->submit_entry(tbl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.write(oid, off, len, bl); + bufferlist tbl; + t._encode(tbl); + journal->submit_entry(tbl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -2929,17 +2920,14 @@ int Ebofs::zero(pobject_t oid, off_t off, size_t len, Context *onsafe) // commit waiter if (r > 0) { assert((size_t)r == len); - while (1) { - if (journal) { - Transaction t; - t.zero(oid, off, len); - bufferlist tbl; - t._encode(tbl); - if (journal->submit_entry(tbl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.zero(oid, off, len); + bufferlist tbl; + t._encode(tbl); + journal->submit_entry(tbl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -2973,17 +2961,14 @@ int Ebofs::remove(pobject_t oid, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.remove(oid); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.remove(oid); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3059,17 +3044,14 @@ int Ebofs::truncate(pobject_t oid, off_t size, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.truncate(oid, size); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.truncate(oid, size); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3088,17 +3070,14 @@ int Ebofs::clone(pobject_t from, pobject_t to, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.clone(from, to); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.clone(from, to); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3275,17 +3254,14 @@ int Ebofs::setattr(pobject_t oid, const char *name, const void *value, size_t si // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.setattr(oid, name, value, size); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.setattr(oid, name, value, size); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3318,17 +3294,14 @@ int Ebofs::setattrs(pobject_t oid, map& attrset, Context *onsa // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.setattrs(oid, attrset); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.setattrs(oid, attrset); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3433,17 +3406,14 @@ int Ebofs::rmattr(pobject_t oid, const char *name, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.rmattr(oid, name); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.rmattr(oid, name); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3539,17 +3509,14 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.create_collection(cid); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.create_collection(cid); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3596,17 +3563,14 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.remove_collection(cid); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.remove_collection(cid); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3660,17 +3624,14 @@ int Ebofs::collection_add(coll_t cid, pobject_t oid, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.collection_add(cid, oid); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.collection_add(cid, oid); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3711,17 +3672,14 @@ int Ebofs::collection_remove(coll_t cid, pobject_t oid, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.collection_remove(cid, oid); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.collection_remove(cid, oid); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3784,17 +3742,14 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.collection_setattr(cid, name, value, size); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.collection_setattr(cid, name, value, size); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } @@ -3892,17 +3847,14 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) // journal, wait for commit if (r >= 0) { - while (1) { - if (journal) { - Transaction t; - t.collection_rmattr(cid, name); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; - } - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - break; - } + if (journal) { + Transaction t; + t.collection_rmattr(cid, name); + bufferlist bl; + t._encode(bl); + journal->submit_entry(bl, onsafe); + } else + queue_commit_waiter(onsafe); } else { if (onsafe) delete onsafe; } diff --git a/src/ebofs/Ebofs.h b/src/ebofs/Ebofs.h index b0ba503f65ed1..216c1428baa34 100644 --- a/src/ebofs/Ebofs.h +++ b/src/ebofs/Ebofs.h @@ -80,6 +80,13 @@ protected: public: uint64_t get_fsid() { return super_fsid; } epoch_t get_super_epoch() { return super_epoch; } + + void queue_commit_waiter(Context *oncommit) { + if (oncommit) + commit_waiters[super_epoch].push_back(oncommit); + } + + protected: @@ -239,13 +246,13 @@ protected: public: - Ebofs(const char *devfn, char *jfn=0) : + Ebofs(const char *devfn, const char *jfn=0) : fake_writes(false), dev(devfn), mounted(false), unmounting(false), dirty(false), readonly(false), super_epoch(0), commit_starting(false), commit_thread_started(false), commit_thread(this), - journalfn(jfn), journal(0), + journal(0), free_blocks(0), limbo_blocks(0), allocator(this), nodepool(ebofs_lock), @@ -258,7 +265,10 @@ protected: finisher_stop(false), finisher_thread(this) { for (int i=0; iget_super_epoch()) { @@ -190,131 +190,132 @@ void FileJournal::write_thread_entry() if (writeq.empty()) { // sleep dout(20) << "write_thread_entry going to sleep" << dendl; - assert(write_pos == queue_pos); write_cond.Wait(write_lock); dout(20) << "write_thread_entry woke up" << dendl; continue; } - // do queued writes + // gather queued writes + off_t queue_pos = write_pos; + bufferlist bl; + while (!writeq.empty()) { // grab next item epoch_t epoch = writeq.front().first; - bufferlist bl; - bl.claim(writeq.front().second); - writeq.pop_front(); - Context *oncommit = commitq.front(); - commitq.pop_front(); + bufferlist &ebl = writeq.front().second; + off_t size = 2*sizeof(entry_header_t) + ebl.length(); - // wrap? - if (write_pos == header.wrap) { - dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << dendl; - assert(header.wrap == write_pos); - write_header(); - write_pos = sizeof(header_t); + // epoch boundary? + if (epoch > header.last_epoch()) { + dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl; + header.push(epoch, queue_pos); } - // write! - dout(15) << "write_thread_entry writing " << write_pos << " : " - << bl.length() - << " epoch " << epoch - << dendl; + // does it fit? + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (queue_pos + size >= header.offset[0]) { + if (queue_pos != write_pos) break; // do what we have, first + dout(10) << "JOURNAL FULL (and wrapped), " << queue_pos << "+" << size + << " >= " << header.offset[0] + << dendl; + full = true; + writeq.clear(); + print_header(); + break; + } + } else { + // we haven't wrapped. + if (queue_pos + size >= header.max_size) { + if (queue_pos != write_pos) break; // do what we have, first + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl; + header.wrap = queue_pos; + queue_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), queue_pos); + write_header(); + } else { + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size + << " >= " << header.max_size + << dendl; + full = true; + writeq.clear(); + break; + } + } + } + + // add to write buffer + dout(15) << "write_thread_entry will write " << queue_pos << " : " + << ebl.length() + << " epoch " << epoch + << dendl; - // write entry header + // add it this entry entry_header_t h; h.epoch = epoch; - h.len = bl.length(); + h.len = ebl.length(); h.make_magic(write_pos, header.fsid); + bl.append((const char*)&h, sizeof(h)); + bl.claim_append(ebl); + bl.append((const char*)&h, sizeof(h)); + + Context *oncommit = commitq.front(); + if (oncommit) + writingq.push_back(oncommit); + + // pop from writeq + writeq.pop_front(); + commitq.pop_front(); + break; + } - ::lseek(fd, write_pos, SEEK_SET); - ::write(fd, &h, sizeof(h)); + // write anything? + if (bl.length() > 0) { + writing = true; + write_lock.Unlock(); + dout(15) << "write_thread_entry writing " << write_pos << "~" << bl.length() << dendl; + ::lseek(fd, write_pos, SEEK_SET); for (list::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); it++) { if ((*it).length() == 0) continue; // blank buffer. ::write(fd, (char*)(*it).c_str(), (*it).length() ); } - - ::write(fd, &h, sizeof(h)); - // move position pointer - write_pos += 2*sizeof(entry_header_t) + bl.length(); - - if (oncommit) { - if (1) { - // queue callback - ebofs->queue_finisher(oncommit); - } else { - // callback now - oncommit->finish(0); - delete oncommit; - } - } + write_lock.Lock(); + writing = false; + write_pos = queue_pos; + ebofs->queue_finishers(writingq); } } - write_lock.Unlock(); dout(10) << "write_thread_entry finish" << dendl; } -bool FileJournal::submit_entry(bufferlist& e, Context *oncommit) +bool FileJournal::is_full() { - assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. - - // ** lock ** Mutex::Locker locker(write_lock); + return full; +} - // wrap? full? - off_t size = 2*sizeof(entry_header_t) + e.length(); - - if (full) return false; // already marked full. +void FileJournal::submit_entry(bufferlist& e, Context *oncommit) +{ + Mutex::Locker locker(write_lock); // ** lock ** - if (header.wrap) { - // we're wrapped. don't overwrite ourselves. - if (queue_pos + size >= header.offset[0]) { - dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size - << " >= " << header.offset[0] - << dendl; - full = true; - print_header(); - return false; - } - } else { - // we haven't wrapped. - if (queue_pos + size >= header.max_size) { - // is there room if we wrap? - if ((off_t)sizeof(header_t) + size < header.offset[0]) { - // yes! - dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << dendl; - header.wrap = queue_pos; - queue_pos = sizeof(header_t); - header.push(ebofs->get_super_epoch(), queue_pos); - } else { - // no room. - dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size - << " >= " << header.max_size - << dendl; - full = true; - return false; - } - } - } - - dout(10) << "submit_entry " << queue_pos << " : " << e.length() + // dump on queue + dout(10) << "submit_entry " << e.length() << " epoch " << ebofs->get_super_epoch() << " " << oncommit << dendl; - - // dump on queue - writeq.push_back(pair(ebofs->get_super_epoch(), e)); commitq.push_back(oncommit); - - queue_pos += size; - - // kick writer thread - write_cond.Signal(); - - return true; + if (!full) { + writeq.push_back(pair(ebofs->get_super_epoch(), e)); + write_cond.Signal(); // kick writer thread + } } @@ -336,63 +337,53 @@ void FileJournal::commit_epoch_start() dout(1) << " clearing FULL flag, journal now usable" << dendl; full = false; } - - // note epoch boundary - header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. - //write_header(); // no need to write it now, though... } void FileJournal::commit_epoch_finish() { dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl; - write_lock.Lock(); - { - if (full) { - // full journal damage control. - dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; - header.clear(); - write_pos = queue_pos = sizeof(header_t); - } else { - // update header -- trim/discard old (committed) epochs - while (header.epoch[0] < ebofs->get_super_epoch()) - header.pop(); - } - write_header(); - - // discard any unwritten items in previous epoch, and do callbacks - epoch_t epoch = ebofs->get_super_epoch(); - list callbacks; - while (!writeq.empty() && writeq.front().first < epoch) { - dout(15) << " dropping unwritten and committed " - << write_pos << " : " << writeq.front().second.length() - << " epoch " << writeq.front().first - << dendl; - // finisher? - Context *oncommit = commitq.front(); - if (oncommit) callbacks.push_back(oncommit); - - write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); - - // discard. - writeq.pop_front(); - commitq.pop_front(); - } - - // queue the finishers - ebofs->queue_finishers(callbacks); - } - write_lock.Unlock(); - + Mutex::Locker locker(write_lock); + + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl; + header.clear(); + write_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.num && header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } + write_header(); + + // discard any unwritten items in previous epoch + epoch_t epoch = ebofs->get_super_epoch(); + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << dendl; + // finisher? + Context *oncommit = commitq.front(); + if (oncommit) writingq.push_back(oncommit); + + // discard. + writeq.pop_front(); + commitq.pop_front(); + } + + // queue the finishers + ebofs->queue_finishers(writingq); } void FileJournal::make_writeable() { if (read_pos) - write_pos = queue_pos = read_pos; + write_pos = read_pos; else - write_pos = queue_pos = sizeof(header_t); + write_pos = sizeof(header_t); read_pos = 0; } diff --git a/src/ebofs/FileJournal.h b/src/ebofs/FileJournal.h index 446adeb826c71..84b3a603f74a8 100644 --- a/src/ebofs/FileJournal.h +++ b/src/ebofs/FileJournal.h @@ -66,6 +66,9 @@ public: offset[num] = o; num++; } + epoch_t last_epoch() { + return epoch[num-1]; + } } header; struct entry_header_t { @@ -88,16 +91,18 @@ public: private: string fn; - bool full; + bool full, writing; off_t write_pos; // byte where next entry written goes - off_t queue_pos; // byte where next entry queued for write goes - off_t read_pos; // int fd; - list > writeq; // currently journaling - list commitq; // currently journaling + // to be journaled + list > writeq; + list commitq; + + // being journaled + list writingq; // write thread Mutex write_lock; @@ -122,10 +127,10 @@ private: } write_thread; public: - FileJournal(Ebofs *e, char *f) : + FileJournal(Ebofs *e, const char *f) : Journal(e), fn(f), - full(false), - write_pos(0), queue_pos(0), read_pos(0), + full(false), writing(false), + write_pos(0), read_pos(0), fd(0), write_stop(false), write_thread(this) { } ~FileJournal() {} @@ -137,12 +142,14 @@ private: void make_writeable(); // writes - bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void submit_entry(bufferlist& e, Context *oncommit); // submit an item void commit_epoch_start(); // mark epoch boundary void commit_epoch_finish(); // mark prior epoch as committed (we can expire) bool read_entry(bufferlist& bl, epoch_t& e); + bool is_full(); + // reads }; diff --git a/src/ebofs/Journal.h b/src/ebofs/Journal.h index 9bab0b7f3c109..1f738dbdaca9a 100644 --- a/src/ebofs/Journal.h +++ b/src/ebofs/Journal.h @@ -35,10 +35,11 @@ public: // writes virtual void make_writeable() = 0; - virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item + virtual void submit_entry(bufferlist& e, Context *oncommit) = 0; virtual void commit_epoch_start() = 0; // mark epoch boundary virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire) virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0; + virtual bool is_full() = 0; // reads/recovery diff --git a/src/ebofs/Onode.h b/src/ebofs/Onode.h index aa0dca3175c2b..ad365d3f7e2b7 100644 --- a/src/ebofs/Onode.h +++ b/src/ebofs/Onode.h @@ -169,7 +169,7 @@ public: // allocation void verify_extents() { - if (1) { // do crazy stupid sanity checking + if (0) { // do crazy stupid sanity checking block_t count = 0, pos = 0; interval_set is; csum_t csum = 0; diff --git a/src/ebofs/streamtest.cc b/src/ebofs/streamtest.cc index 6ce7f843f1f9c..40d7671543f18 100644 --- a/src/ebofs/streamtest.cc +++ b/src/ebofs/streamtest.cc @@ -12,6 +12,8 @@ * */ +#define dout(x) if (x <= g_conf.debug_ebofs) *_dout << dbeginl + #include #include "ebofs/Ebofs.h" @@ -22,9 +24,9 @@ struct C_Commit : public Context { C_Commit(off_t o) : off(o) {} void finish(int r) { utime_t now = g_clock.now(); - cout << off << "\t" + dout(0) << off << "\t" << (writes[off].second-writes[off].first) << "\t" - << (now - writes[off].first) << std::endl; + << (now - writes[off].first) << dendl; writes.erase(off); } }; @@ -37,10 +39,13 @@ int main(int argc, const char **argv) parse_config_options(args); // args - if (args.size() != 3) return -1; + if (args.size() < 3) return -1; const char *filename = args[0]; int seconds = atoi(args[1]); int bytes = atoi(args[2]); + const char *journal = 0; + if (args.size() >= 4) + journal = args[3]; buffer::ptr bp(bytes); bp.zero(); @@ -51,7 +56,7 @@ int main(int argc, const char **argv) cout << "#dev " << filename << seconds << " seconds, " << bytes << " bytes per write" << std::endl; - Ebofs fs(filename); + Ebofs fs(filename, journal); if (fs.mkfs() < 0) { cout << "mkfs failed" << std::endl; return -1; -- 2.39.5