dupstore: dupstore.cc config.cc ebofs.o common/Clock.o common/Timer.o osd/FakeStore.o
${CXX} ${CFLAGS} ${LIBS} $^ -o $@
+allebofs: mkfs.ebofs test.ebofs streamtest.ebofs dupstore
+
# hadoop
libhadoopcephfs.so: client/hadoop/CephFSInterface.cc client.o osdc.o msg/SimpleMessenger.o common.o
#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
-int FileJournal::_open()
+int FileJournal::_open(bool forwrite)
{
- int flags = O_RDWR;
- if (directio) flags |= O_DIRECT;
+ int flags;
+
+ if (forwrite) {
+ flags = O_RDONLY;
+ } else {
+ flags = O_RDWR;
+ if (directio) flags |= O_DIRECT;
+ }
+ if (fd >= 0)
+ ::close(fd);
fd = ::open(fn.c_str(), flags);
if (fd < 0) {
dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl;
return -errno;
}
- assert(fd > 0);
// get size
struct stat st;
{
dout(2) << "create " << fn << dendl;
- int err = _open();
+ int err = _open(true);
if (err < 0) return err;
// write empty header
write_pos = get_top();
::close(fd);
+ fd = -1;
dout(2) << "create done" << dendl;
return 0;
}
{
dout(2) << "open " << fn << dendl;
- int err = _open();
+ int err = _open(false);
if (err < 0) return err;
// assume writeable, unless...
}
}
}
- write_header();
- start_writer();
return 0;
}
assert(commitq.empty());
assert(fd > 0);
::close(fd);
- fd = 0;
+ fd = -1;
}
void FileJournal::start_writer()
}
}
-
bool FileJournal::prepare_single_dio_write(bufferlist& bl)
{
// grab next item
writing = true;
+ header_t old_header = header;
+
write_lock.Unlock();
dout(15) << "do_write writing " << write_pos << "~" << bl.length()
::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos);
pos += (*it).length();
}
+ ::fdatasync(fd);
write_lock.Lock();
writing = false;
- write_pos += bl.length();
- ebofs->queue_finishers(writingq);
+ if (memcmp(&old_header, &header, sizeof(header)) == 0) {
+ write_pos += bl.length();
+ ebofs->queue_finishers(writingq);
+ } else {
+ derr(0) << "do_write finished write but header changed? not moving write_pos." << dendl;
+ assert(writingq.empty());
+ }
}
dout(1) << " journal FULL, ignoring this epoch" << dendl;
return;
}
-
+
dout(1) << " clearing FULL flag, journal now usable" << dendl;
full = false;
}
void FileJournal::make_writeable()
{
+ _open(true);
+
if (read_pos)
write_pos = read_pos;
else
write_pos = get_top();
read_pos = 0;
+
+ must_write_header = true;
+ start_writer();
}
// header
entry_header_t h;
- ::lseek(fd, read_pos, SEEK_SET);
- ::read(fd, &h, sizeof(h));
+ ::pread(fd, &h, sizeof(h), read_pos);
if (!h.check_magic(read_pos, header.fsid)) {
dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
return false;
Cond write_cond;
bool write_stop;
- int _open();
+ int _open(bool wr);
void print_header();
void read_header();
bufferptr prepare_header();
directio(dio),
full(false), writing(false), must_write_header(false),
write_pos(0), read_pos(0),
- fd(0),
+ fd(-1),
write_stop(false), write_thread(this) { }
~FileJournal() {}
#include "ebofs/Ebofs.h"
-int main(int argc, char **argv)
+int main(int argc, const char **argv)
{
// args
- vector<char*> args;
+ vector<const char*> args;
argv_to_vec(argc, argv, args);
parse_config_options(args);
cerr << "usage: mkfs.ebofs [options] <device file>" << std::endl;
return -1;
}
- char *filename = args[0];
+ const char *filename = args[0];
// mkfs
Ebofs mfs(filename);
#include <iostream>
#include "ebofs/Ebofs.h"
-map<off_t, pair<utime_t,utime_t> > writes;
+struct io {
+ utime_t start, ack, commit;
+ bool done() {
+ return ack.sec() && commit.sec();
+ }
+};
+map<off_t,io> writes;
Mutex lock;
+
+void pr(off_t off)
+{
+ io &i = writes[off];
+ dout(0) << off << "\t"
+ << (i.ack - i.start) << "\t"
+ << (i.commit - i.start) << dendl;
+ writes.erase(off);
+}
+
+void set_start(off_t off, utime_t t)
+{
+ Mutex::Locker l(lock);
+ writes[off].start = t;
+}
+
+void set_ack(off_t off, utime_t t)
+{
+ Mutex::Locker l(lock);
+ writes[off].ack = t;
+ if (writes[off].done())
+ pr(off);
+}
+
+void set_commit(off_t off, utime_t t)
+{
+ Mutex::Locker l(lock);
+ writes[off].commit = t;
+ if (writes[off].done())
+ pr(off);
+}
+
+
struct C_Commit : public Context {
off_t off;
C_Commit(off_t o) : off(o) {}
void finish(int r) {
Mutex::Locker l(lock);
- utime_t now = g_clock.now();
- dout(0) << off << "\t"
- << (writes[off].second-writes[off].first) << "\t"
- << (now - writes[off].first) << dendl;
- writes.erase(off);
+ set_commit(off, g_clock.now());
}
};
cout << "# offset\tack\tcommit" << std::endl;
while (now < end) {
object_t oid(1,1);
- utime_t start;
- {
- Mutex::Locker l(lock);
- start = writes[pos].first = now;
- }
+ utime_t start = now;
+ set_start(pos, now);
fs.write(oid, pos, bytes, bl, new C_Commit(pos));
now = g_clock.now();
- {
- Mutex::Locker l(lock);
- writes[pos].second = now;
- }
+ set_ack(pos, now);
pos += bytes;
// wait?
coll_t cid = rand() % 50;
off_t off = rand() % 10000;//0;//rand() % 1000000;
off_t len = 1+rand() % 100000;
- char *a = "one";
+ const char *a = "one";
if (rand() % 2) a = "two";
int l = 3;//rand() % 10;
}
};
-int main(int argc, char **argv)
+int main(int argc, const char **argv)
{
- vector<char*> args;
+ vector<const char*> args;
argv_to_vec(argc, argv, args);
parse_config_options(args);
// args
if (args.size() != 3) return -1;
- char *filename = args[0];
+ const char *filename = args[0];
int seconds = atoi(args[1]);
int threads = atoi(args[2]);
if (!threads) threads = 1;