OPTION(filestore_debug_verify_split, OPT_BOOL, false)
OPTION(journal_dio, OPT_BOOL, true)
OPTION(journal_aio, OPT_BOOL, false)
+
+// max bytes to search ahead in journal searching for corruption
+OPTION(journal_max_corrupt_search, OPT_U64, 10<<20)
OPTION(journal_block_align, OPT_BOOL, true)
OPTION(journal_write_header_frequency, OPT_U64, 0)
OPTION(journal_max_write_bytes, OPT_INT, 10 << 20)
OPTION(journal_align_min_size, OPT_INT, 64 << 10) // align data payloads >= this.
OPTION(journal_replay_from, OPT_INT, 0)
OPTION(journal_zero_on_create, OPT_BOOL, false)
+OPTION(journal_ignore_corruption, OPT_BOOL, false) // assume journal is not corrupt
OPTION(rbd_cache, OPT_BOOL, false) // whether to enable caching (writeback unless rbd_cache_max_dirty is 0)
OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size in bytes
OPTION(rbd_cache_max_dirty, OPT_LONGLONG, 24<<20) // dirty limit in bytes - set to 0 for write-through caching
*out_pos = pos;
}
-bool FileJournal::read_entry(bufferlist& bl, uint64_t& seq)
+bool FileJournal::read_entry(
+ bufferlist &bl,
+ uint64_t &next_seq,
+ bool *corrupt)
{
+ if (corrupt)
+ *corrupt = false;
+ uint64_t seq = next_seq;
+
if (!read_pos) {
dout(2) << "read_entry -- not readable" << dendl;
return false;
}
off64_t pos = read_pos;
+ off64_t next_pos = pos;
+ stringstream ss;
+ read_entry_result result = do_read_entry(
+ pos,
+ &next_pos,
+ &bl,
+ &seq,
+ &ss);
+ if (result == SUCCESS) {
+ if (next_seq > seq) {
+ return false;
+ } else {
+ read_pos = next_pos;
+ next_seq = seq;
+ return true;
+ }
+ }
+
+ stringstream errss;
+ while (result == MAYBE_CORRUPT &&
+ (static_cast<uint64_t>(pos - read_pos) <
+ g_conf->journal_max_corrupt_search)) {
+ errss << "Entry at pos " << pos << " possibly corrupt due to: ("
+ << ss.str() << ")" << std::endl;
+ ss.clear();
+ pos = next_pos;
+ result = do_read_entry(
+ pos,
+ &next_pos,
+ &bl,
+ &seq,
+ &ss);
+ }
+
+ if (result == SUCCESS) {
+ if (seq >= next_seq) {
+ derr << errss.str() << dendl;
+ derr << "Entry at pos " << pos << " valid, there are missing sequence "
+ << "numbers prior to seq " << seq << dendl;
+ if (g_conf->journal_ignore_corruption) {
+ if (corrupt)
+ *corrupt = true;
+ return false;
+ } else {
+ assert(0);
+ }
+ } else { // We read a valid, but old entry, no problem
+ return false;
+ }
+ }
+
+ if (seq < header.committed_up_to) {
+ derr << "Unable to read past sequence " << seq
+ << " but header indicates the journal has committed up through "
+ << header.committed_up_to << ", journal is corrupt" << dendl;
+ if (g_conf->journal_ignore_corruption) {
+ if (corrupt)
+ *corrupt = true;
+ return false;
+ } else {
+ assert(0);
+ }
+ }
+
+ dout(2) << errss.str() << dendl;
+ dout(2) << "No further valid entries found, journal is most likely valid"
+ << dendl;
+ return false;
+}
+
+FileJournal::read_entry_result FileJournal::do_read_entry(
+ off64_t pos,
+ off64_t *next_pos,
+ bufferlist *bl,
+ uint64_t *seq,
+ ostream *ss,
+ entry_header_t *_h)
+{
+ bufferlist _bl;
+ if (!bl)
+ bl = &_bl;
// header
entry_header_t *h;
bufferlist hbl;
- wrap_read_bl(pos, sizeof(*h), &hbl, &pos);
+ off64_t _next_pos;
+ wrap_read_bl(pos, sizeof(*h), &hbl, &_next_pos);
h = (entry_header_t *)hbl.c_str();
- if (!h->check_magic(read_pos, header.get_fsid64())) {
- if (header.committed_up_to && seq <= header.committed_up_to) {
- derr << "ERROR: header claims we are committed up through "
- << header.committed_up_to << " however, we have failed to read "
- << "entry " << seq
- << " journal is likely corrupt" << dendl;
- assert(0 == "FileJournal::read_entry(): corrupt journal");
- }
- dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
- return false;
+ if (!h->check_magic(pos, header.get_fsid64())) {
+ dout(2) << "read_entry " << pos
+ << " : bad header magic, end of journal" << dendl;
+ if (ss)
+ *ss << "bad header magic";
+ if (next_pos)
+ *next_pos = pos + (4<<10); // check 4k ahead
+ return MAYBE_CORRUPT;
}
+ pos = _next_pos;
// pad + body + pad
if (h->pre_pad)
pos += h->pre_pad;
- bl.clear();
- wrap_read_bl(pos, h->len, &bl, &pos);
+ bl->clear();
+ wrap_read_bl(pos, h->len, bl, &pos);
if (h->post_pad)
pos += h->post_pad;
wrap_read_bl(pos, sizeof(*f), &fbl, &pos);
f = (entry_header_t *)fbl.c_str();
if (memcmp(f, h, sizeof(*f))) {
- dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
- return false;
+ if (ss)
+ *ss << "bad footer magic, partial entry";
+ if (next_pos)
+ *next_pos = pos;
+ return MAYBE_CORRUPT;
}
if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
h->crc32c != 0) { // newer entry in old journal
- uint32_t actual_crc = bl.crc32c(0);
+ uint32_t actual_crc = bl->crc32c(0);
if (actual_crc != h->crc32c) {
- dout(2) << "read_entry " << read_pos << " : header crc (" << h->crc32c
- << ") doesn't match body crc (" << actual_crc << ")" << dendl;
- return false;
+ if (ss)
+ *ss << "header crc (" << h->crc32c
+ << ") doesn't match body crc (" << actual_crc << ")";
+ if (next_pos)
+ *next_pos = pos;
+ return MAYBE_CORRUPT;
}
}
// yay!
- dout(2) << "read_entry " << read_pos << " : seq " << h->seq
+ dout(2) << "read_entry " << pos << " : seq " << h->seq
<< " " << h->len << " bytes"
<< dendl;
- if (seq && h->seq < seq) {
- dout(2) << "read_entry " << read_pos << " : got seq " << h->seq << ", expected " << seq << ", stopping" << dendl;
- return false;
- }
-
// ok!
- seq = h->seq;
- journalq.push_back(pair<uint64_t,off64_t>(h->seq, read_pos));
+ if (seq)
+ *seq = h->seq;
+ journalq.push_back(pair<uint64_t,off64_t>(h->seq, pos));
+
+ if (next_pos)
+ *next_pos = pos;
+
+ if (_h)
+ *_h = *h;
- read_pos = pos;
- assert(read_pos % header.alignment == 0);
-
- return true;
+ assert(pos % header.alignment == 0);
+ return SUCCESS;
}
void FileJournal::throttle()
void set_wait_on_full(bool b) { wait_on_full = b; }
// reads
- bool read_entry(bufferlist& bl, uint64_t& seq);
+
+ /// Result code for read_entry
+ enum read_entry_result {
+ SUCCESS,
+ FAILURE,
+ MAYBE_CORRUPT
+ };
+
+ /**
+ * read_entry
+ *
+ * Reads next entry starting at pos. If the entry appears
+ * clean, *bl will contain the payload, *seq will contain
+ * the sequence number, and *out_pos will reflect the next
+ * read position. If the entry is invalid *ss will contain
+ * debug text, while *seq, *out_pos, and *bl will be unchanged.
+ *
+ * If the entry suggests a corrupt log, *ss will contain debug
+ * text, *out_pos will contain the next index to check. If
+ * we find an entry in this way that returns SUCCESS, the journal
+ * is most likely corrupt.
+ */
+ read_entry_result do_read_entry(
+ off64_t pos, ///< [in] position to read
+ off64_t *next_pos, ///< [out] next position to read
+ bufferlist* bl, ///< [out] payload for successful read
+ uint64_t *seq, ///< [out] seq of successful read
+ ostream *ss, ///< [out] error output
+ entry_header_t *h = 0 ///< [out] header
+ ); ///< @return result code
+
+ bool read_entry(
+ bufferlist &bl,
+ uint64_t &last_seq,
+ bool *corrupt
+ );
+
+ bool read_entry(
+ bufferlist &bl,
+ uint64_t &last_seq) {
+ return read_entry(bl, last_seq, 0);
+ }
};
WRITE_CLASS_ENCODER(FileJournal::header_t)