{
int flags, ret;
- if (aio && !directio) {
- derr << "FileJournal::_open: aio not supported without directio; disabling aio" << dendl;
- aio = false;
- }
-#ifndef HAVE_LIBAIO
- if (aio) {
- derr << "FileJournal::_open: libaio not compiled in; disabling aio" << dendl;
- aio = false;
- }
-#endif
-
if (forwrite) {
flags = O_RDWR;
if (directio)
return 0;
}
+// This can not be used on an active journal
int FileJournal::check()
{
int ret;
+ assert(fd == -1);
ret = _open(false, false);
if (ret)
- goto done;
+ return ret;
- ret = read_header();
+ ret = read_header(&header);
if (ret < 0)
goto done;
ret = 0;
done:
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- fd = -1;
+ close();
return ret;
}
header.start = get_top();
header.start_seq = 0;
- print_header();
+ print_header(header);
// static zeroed buffer for alignment padding
delete [] zero_buf;
return ret;
}
+// This can not be used on an active journal
int FileJournal::peek_fsid(uuid_d& fsid)
{
+ assert(fd == -1);
int r = _open(false, false);
if (r)
return r;
- r = read_header();
+ r = read_header(&header);
if (r < 0)
- return r;
+ goto out;
fsid = header.fsid;
- return 0;
+out:
+ close();
+ return r;
}
int FileJournal::open(uint64_t fs_op_seq)
write_pos = get_top();
// read header?
- err = read_header();
+ err = read_header(&header);
if (err < 0)
return err;
return 0;
}
+void FileJournal::_close(int fd) const
+{
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+}
+
void FileJournal::close()
{
dout(1) << "close " << fn << dendl;
assert(writeq_empty());
assert(!must_write_header);
assert(fd >= 0);
- VOID_TEMP_FAILURE_RETRY(::close(fd));
+ _close(fd);
fd = -1;
}
int FileJournal::dump(ostream& out)
{
- int err = 0;
-
dout(10) << "dump" << dendl;
- err = _open(false, false);
+
+ assert(fd == -1);
+ int err = _open(false, false);
if (err)
return err;
- err = read_header();
- if (err < 0)
+ err = read_header(&header);
+ if (err < 0) {
+ close();
return err;
+ }
- read_pos = header.start;
+ off64_t next_pos = header.start;
JSONFormatter f(true);
f.open_object_section("journal");
f.close_section();
f.open_array_section("entries");
- uint64_t seq = 0;
+ uint64_t seq = header.start_seq;
while (1) {
bufferlist bl;
- uint64_t pos = read_pos;
- if (!read_entry(bl, seq)) {
- dout(3) << "journal_replay: end of journal, done." << dendl;
+ off64_t pos = next_pos;
+
+ if (!pos) {
+ dout(2) << "_dump -- not readable" << dendl;
+ return false;
+ }
+ stringstream ss;
+ read_entry_result result = do_read_entry(
+ pos,
+ &next_pos,
+ &bl,
+ &seq,
+ &ss);
+ if (result != SUCCESS) {
+ if (seq < header.committed_up_to) {
+ dout(2) << "Unable to read past sequence " << seq
+ << " but header indicates the journal has committed up through "
+ << header.committed_up_to << ", journal is corrupt" << dendl;
+ err = EINVAL;
+ }
+ dout(25) << ss.str() << dendl;
+ dout(25) << "No further valid entries found, journal is most likely valid"
+ << dendl;
break;
}
f.close_section();
f.flush(out);
dout(10) << "dump finish" << dendl;
- return 0;
+
+ close();
+ return err;
}
void FileJournal::stop_writer()
{
+ // Do nothing if writer already stopped or never started
+ if (!write_stop)
{
- Mutex::Locker l(write_lock);
- Mutex::Locker p(writeq_lock);
- write_stop = true;
- writeq_cond.Signal();
- // Doesn't hurt to signal commit_cond in case thread is waiting there
- // and caller didn't use committed_thru() first.
- commit_cond.Signal();
- }
- write_thread.join();
-
- // write journal header now so that we have less to replay on remount
- write_header_sync();
+ {
+ Mutex::Locker l(write_lock);
+ Mutex::Locker p(writeq_lock);
+ write_stop = true;
+ writeq_cond.Signal();
+ // Doesn't hurt to signal commit_cond in case thread is waiting there
+ // and caller didn't use committed_thru() first.
+ commit_cond.Signal();
+ }
+ write_thread.join();
+
+ // write journal header now so that we have less to replay on remount
+ write_header_sync();
+ }
#ifdef HAVE_LIBAIO
// stop aio completeion thread *after* writer thread has stopped
// and has submitted all of its io
- if (aio) {
+ if (aio && !aio_stop) {
aio_lock.Lock();
aio_stop = true;
aio_cond.Signal();
-void FileJournal::print_header()
+void FileJournal::print_header(const header_t &header) const
{
dout(10) << "header: block_size " << header.block_size
<< " alignment " << header.alignment
dout(10) << " write_pos " << write_pos << dendl;
}
-int FileJournal::read_header()
+int FileJournal::read_header(header_t *hdr) const
{
dout(10) << "read_header" << dendl;
bufferlist bl;
try {
bufferlist::iterator p = bl.begin();
- ::decode(header, p);
+ ::decode(*hdr, p);
}
catch (buffer::error& e) {
derr << "read_header error decoding journal header" << dendl;
* remove this or else this (eventually old) code will clobber newer
* code's flags.
*/
- if (header.flags > 3) {
+ if (hdr->flags > 3) {
derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
- header.flags = 0;
+ hdr->flags = 0;
}
- print_header();
+ print_header(*hdr);
return 0;
}
put_throttle(1, peek_write().bl.length());
pop_write();
}
- print_header();
+ print_header(header);
}
return -ENOSPC; // hrm, full on first op
put_throttle(1, peek_write().bl.length());
pop_write();
}
- print_header();
+ print_header(header);
r = 0;
} else {
dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
}
must_write_header = true;
- print_header();
+ print_header(header);
// committed but unjournaled items
while (!writeq_empty() && peek_write().seq <= seq) {
int64_t olen,
bufferlist* bl,
off64_t *out_pos
- )
+ ) const
{
while (olen > 0) {
while (pos >= header.max_size)
&seq,
&ss);
if (result == SUCCESS) {
+ journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
if (next_seq > seq) {
return false;
} else {
bufferlist *bl,
uint64_t *seq,
ostream *ss,
- entry_header_t *_h)
+ entry_header_t *_h) const
{
off64_t cur_pos = init_pos;
bufferlist _bl;
if (seq)
*seq = h->seq;
- // works around an apparent GCC 4.8(?) compiler bug about unaligned
- // bind by reference to (packed) h->seq
- journalq.push_back(
- pair<uint64_t,off64_t>(static_cast<uint64_t>(h->seq),
- static_cast<off64_t>(init_pos)));
if (next_pos)
*next_pos = cur_pos;
start = block_size;
}
- uint64_t get_fsid64() {
+ uint64_t get_fsid64() const {
return *(uint64_t*)&fsid.uuid[0];
}
int _open(bool wr, bool create=false);
int _open_block_device();
+ void _close(int fd) const;
void _check_disk_write_cache() const;
int _open_file(int64_t oldsize, blksize_t blksize, bool create);
- void print_header();
- int read_header();
+ void print_header(const header_t &hdr) const;
+ int read_header(header_t *hdr) const;
bufferptr prepare_header();
void start_writer();
void stop_writer();
int64_t len, ///< [in] length to read
bufferlist* bl, ///< [out] result
off64_t *out_pos ///< [out] next position to read, will be wrapped
- );
+ ) const;
void do_discard(int64_t offset, int64_t end);
}
} write_finish_thread;
- off64_t get_top() {
+ off64_t get_top() const {
return ROUND_UP_TO(sizeof(header), block_size);
}
throttle_ops(g_ceph_context, "filestore_ops", g_conf->journal_queue_max_ops),
throttle_bytes(g_ceph_context, "filestore_bytes", g_conf->journal_queue_max_bytes),
write_lock("FileJournal::write_lock", false, true, false, g_ceph_context),
- write_stop(false),
- aio_stop(false),
+ write_stop(true),
+ aio_stop(true),
write_thread(this),
- write_finish_thread(this) { }
+ write_finish_thread(this) {
+
+ if (aio && !directio) {
+ derr << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
+ aio = false;
+ }
+#ifndef HAVE_LIBAIO
+ if (aio) {
+ derr << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
+ aio = false;
+ }
+#endif
+ }
~FileJournal() {
+ assert(fd == -1);
delete[] zero_buf;
}
uint64_t *seq, ///< [out] seq of successful read
ostream *ss, ///< [out] error output
entry_header_t *h = 0 ///< [out] header
- ); ///< @return result code
+ ) const; ///< @return result code
bool read_entry(
bufferlist &bl,