int flags;
if (forwrite) {
- flags = O_RDONLY;
- } else {
flags = O_RDWR;
if (directio) flags |= O_DIRECT;
+ } else {
+ flags = O_RDONLY;
}
if (fd >= 0)
// get size
struct stat st;
- ::fstat(fd, &st);
+ int r = ::fstat(fd, &st);
+ assert(r == 0);
max_size = st.st_size;
block_size = st.st_blksize;
- dout(2) << "_open " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
+ dout(2) << "_open " << fn << " fd " << fd
+ << ": " << st.st_size << " bytes, block size " << block_size << dendl;
return 0;
}
header.fsid = ebofs->get_fsid();
header.max_size = max_size;
header.block_size = block_size;
- write_header();
-
- // writeable.
- read_pos = 0;
- write_pos = get_top();
+ if (directio)
+ header.alignment = block_size;
+ else
+ header.alignment = 16; // at least stay word aligned on 64bit machines...
+ print_header();
+
+ buffer::ptr bp = prepare_header();
+ int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
+ if (r < 0) {
+ dout(0) << "create write header error " << errno << " " << strerror(errno) << dendl;
+ return -errno;
+ }
::close(fd);
fd = -1;
read_header();
if (header.fsid != ebofs->get_fsid()) {
dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+ err = -EINVAL;
}
- else if (header.max_size > max_size) {
- dout(2) << "open old header has mismatched max size, discarding" << dendl;
+ if (header.max_size > max_size) {
+ dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
+ err = -EINVAL;
}
- else if (header.block_size != block_size) {
- dout(2) << "open old header has mismatched block size, discarding" << dendl;
+ if (header.block_size != block_size) {
+ dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
+ err = -EINVAL;
}
- else if (header.num > 0) {
- // valid header.
-
+ if (header.alignment != block_size && directio) {
+ derr(0) << "open journal alignment " << header.alignment << " does not match block size "
+ << block_size << " (required for direct_io journal mode)" << dendl;
+ err = -EINVAL;
+ }
+ if (err)
+ return err;
+
+ // looks like a valid header.
+ write_pos = 0; // not writeable yet
+ read_pos = 0;
+
+ if (header.num > 0) {
// pick an offset
for (int i=0; i<header.num; i++) {
if (header.epoch[i] == ebofs->get_super_epoch()) {
break;
}
}
+
+ if (read_pos == 0) {
+ dout(0) << "no valid journal segments" << dendl;
+ return -EINVAL;
+ }
+
+ } else {
+ dout(0) << "journal was empty" << dendl;
+ read_pos = get_top();
}
return 0;
return bp;
}
-void FileJournal::write_header()
-{
- buffer::ptr bp = prepare_header();
- dout(10) << "write_header writing " << bp.length() << dendl;
- print_header();
-
- int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
- if (r < 0)
- dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
-}
-void FileJournal::check_for_wrap(epoch_t epoch, off_t pos, off_t size)
+void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
{
// epoch boundary?
dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
void FileJournal::prepare_multi_write(bufferlist& bl)
{
// gather queued writes
- off_t queue_pos = write_pos;
+ off64_t queue_pos = write_pos;
int eleft = g_conf.ebofs_journal_max_write_entries;
int bleft = g_conf.ebofs_journal_max_write_bytes;
// grab next item
epoch_t epoch = writeq.front().first;
bufferlist &ebl = writeq.front().second;
- off_t size = 2*sizeof(entry_header_t) + ebl.length();
+ off64_t size = 2*sizeof(entry_header_t) + ebl.length();
if (bl.length() > 0 && bleft > 0 && bleft < size) break;
// add to write buffer
dout(15) << "prepare_multi_write will write " << queue_pos << " : "
- << ebl.length()
- << " epoch " << epoch
- << dendl;
+ << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
// add it this entry
entry_header_t h;
h.epoch = epoch;
h.len = ebl.length();
- h.make_magic(write_pos, header.fsid);
+ h.make_magic(queue_pos, header.fsid);
bl.append((const char*)&h, sizeof(h));
bl.claim_append(ebl);
bl.append((const char*)&h, sizeof(h));
// pop from writeq
writeq.pop_front();
commitq.pop_front();
+
+ queue_pos += size;
if (--eleft == 0) break;
bleft -= size;
if (bleft == 0) break;
epoch_t epoch = writeq.front().first;
bufferlist &ebl = writeq.front().second;
- off_t size = 2*sizeof(entry_header_t) + ebl.length();
- size = DIV_ROUND_UP(size, 4096) * 4096;
+ off64_t size = 2*sizeof(entry_header_t) + ebl.length();
+ size = ROUND_UP_2(size, header.alignment);
check_for_wrap(epoch, write_pos, size);
if (full) return false;
// build it
dout(15) << "prepare_single_dio_write will write " << write_pos << " : "
- << size << " epoch " << epoch << dendl;
+ << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
bufferptr bp = buffer::create_page_aligned(size);
entry_header_t *h = (entry_header_t*)bp.c_str();
::pwrite(fd, hbp.c_str(), hbp.length(), 0);
// entry
- off_t pos = write_pos;
+ off64_t pos = write_pos;
+ ::lseek64(fd, write_pos, SEEK_SET);
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
it++) {
if ((*it).length() == 0) continue; // blank buffer.
- ::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos);
+ int r = ::write(fd, (char*)(*it).c_str(), (*it).length());
+ if (r < 0)
+ derr(0) << "do_write failed with " << errno << " " << strerror(errno)
+ << " with " << (void*)(*it).c_str() << " len " << (*it).length()
+ << dendl;
pos += (*it).length();
}
- ::fdatasync(fd);
+ if (!directio)
+ ::fdatasync(fd);
write_lock.Lock();
writing = false;
if (memcmp(&old_header, &header, sizeof(header)) == 0) {
write_pos += bl.length();
+ write_pos = ROUND_UP_2(write_pos, header.alignment);
ebofs->queue_finishers(writingq);
} else {
+ dout(10) << "do_write finished write but header changed? not moving write_pos." << dendl;
derr(0) << "do_write finished write but header changed? not moving write_pos." << dendl;
assert(writingq.empty());
}
// header
entry_header_t h;
- ::pread(fd, &h, sizeof(h), read_pos);
+ ::lseek64(fd, read_pos, SEEK_SET);
+ ::read(fd, &h, sizeof(h));
if (!h.check_magic(read_pos, header.fsid)) {
dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
return false;
if (!f.check_magic(read_pos, header.fsid) ||
h.epoch != f.epoch ||
h.len != f.len) {
- dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
+ dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
return false;
}
epoch = h.epoch;
read_pos += 2*sizeof(entry_header_t) + h.len;
+ read_pos = ROUND_UP_2(read_pos, header.alignment);
return true;
}
* (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
*/
struct header_t {
- uint64_t fsid;
- int num;
- off_t wrap;
- off_t max_size;
- size_t block_size;
- epoch_t epoch[4];
- off_t offset[4];
+ __u64 fsid;
+ __s64 num;
+ __u32 block_size;
+ __u32 alignment;
+ __s64 max_size;
+ __s64 wrap;
+ __u32 epoch[4];
+ __s64 offset[4];
- header_t() : fsid(0), num(0), wrap(0), max_size(0), block_size(0) {}
+ header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
void clear() {
num = 0;
offset[i] = offset[i+1];
}
}
- void push(epoch_t e, off_t o) {
+ void push(epoch_t e, off64_t o) {
assert(num < 4);
if (num > 2 &&
epoch[num-1] == e &&
uint64_t magic1;
uint64_t magic2;
- void make_magic(off_t pos, uint64_t fsid) {
+ void make_magic(off64_t pos, uint64_t fsid) {
magic1 = pos;
magic2 = fsid ^ epoch ^ len;
}
- bool check_magic(off_t pos, uint64_t fsid) {
+ bool check_magic(off64_t pos, uint64_t fsid) {
return
magic1 == (uint64_t)pos &&
magic2 == (fsid ^ epoch ^ len);
private:
string fn;
- off_t max_size;
+ off64_t max_size;
size_t block_size;
bool directio;
bool full, writing, must_write_header;
- off_t write_pos; // byte where next entry written goes
- off_t read_pos; //
+ off64_t write_pos; // byte where next entry written goes
+ off64_t read_pos; //
int fd;
void print_header();
void read_header();
bufferptr prepare_header();
- void write_header();
void start_writer();
void stop_writer();
void write_thread_entry();
- void check_for_wrap(epoch_t epoch, off_t pos, off_t size);
+ void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
bool prepare_single_dio_write(bufferlist& bl);
void prepare_multi_write(bufferlist& bl);
void do_write(bufferlist& bl);
}
} write_thread;
- off_t get_top() {
+ off64_t get_top() {
if (directio)
return block_size;
else