OPTION(mds_kill_journal_at, OPT_INT, 0)
OPTION(mds_kill_journal_expire_at, OPT_INT, 0)
OPTION(mds_kill_journal_replay_at, OPT_INT, 0)
+OPTION(mds_journal_format, OPT_U32, 1) // Default to most recent JOURNAL_FORMAT_*
OPTION(mds_kill_create_at, OPT_INT, 0)
OPTION(mds_inject_traceless_reply_probability, OPT_DOUBLE, 0) /* percentage
of MDS modify replies to skip sending the
dout(5) << "create empty log" << dendl;
init_journaler();
journaler->set_writeable();
- journaler->create(&mds->mdcache->default_log_layout);
+ journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
journaler->write_head(c);
logger->set(l_mdl_expos, journaler->get_expire_pos());
}
/* Check whether the front journal format is acceptable or needs re-write */
- if (front_journal->get_stream_format() >= JOURNAL_FORMAT_RESILIENT) {
+ if (front_journal->get_stream_format() >= g_conf->mds_journal_format) {
/* Great, the journal is of current format and ready to rock, hook
* it into this->journaler and complete */
journaler = front_journal;
dout(4) << "Writing new journal header " << jp.back << dendl;
ceph_file_layout new_layout = old_journal->get_layout();
new_journal->set_writeable();
- new_journal->create(&new_layout);
+ new_journal->create(&new_layout, g_conf->mds_journal_format);
/* Write the new journal header to RADOS */
C_SaferCond write_head_wait;
readonly = false;
}
-void Journaler::create(ceph_file_layout *l)
+void Journaler::create(ceph_file_layout *l, stream_format_t const sf)
{
assert(!readonly);
ldout(cct, 1) << "create blank journal" << dendl;
state = STATE_ACTIVE;
+ stream_format = sf;
+ journal_stream.set_format(sf);
set_layout(l);
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
last_written.expire_pos = expire_pos;
last_written.unused_field = expire_pos;
last_written.write_pos = safe_pos;
+ last_written.stream_format = stream_format;
ldout(cct, 10) << "write_head " << last_written << dendl;
last_wrote_head = ceph_clock_now(cct);
stream_format_t format;
public:
- JournalStream() : format(JOURNAL_FORMAT_RESILIENT) {}
JournalStream(stream_format_t format_) : format(format_) {}
void set_format(stream_format_t format_) {format = format_;}
stream_format_t stream_format; //< The encoding of LogEvents within the journal byte stream
Header(const char *m="") :
- trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0),
- magic(m),
- stream_format(JOURNAL_FORMAT_RESILIENT) {
+ trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m), stream_format(-1) {
memset(&layout, 0, sizeof(layout));
}
public:
Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) :
last_written(mag), last_committed(mag), cct(obj->cct),
- ino(ino_), pg_pool(pool), readonly(true), magic(mag),
+ ino(ino_), pg_pool(pool), readonly(true),
+ stream_format(-1), journal_stream(-1),
+ magic(mag),
objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey),
timer(tim), delay_flush_event(0),
state(STATE_UNDEF), error(0),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), temp_fetch_len(0),
on_readable(0), on_write_error(NULL),
- expire_pos(0), trimming_pos(0), trimmed_pos(0)
+ expire_pos(0), trimming_pos(0), trimmed_pos(0)
{
memset(&layout, 0, sizeof(layout));
}
friend class C_EraseFinish;
public:
- void create(ceph_file_layout *layout);
+ void create(ceph_file_layout *layout, stream_format_t const sf);
void recover(Context *onfinish);
void reread_head(Context *onfinish);
void reprobe(Context *onfinish);