OPTION(mds_log, OPT_BOOL, true)
OPTION(mds_log_skip_corrupt_events, OPT_BOOL, false)
OPTION(mds_log_max_events, OPT_INT, -1)
+OPTION(mds_log_events_per_segment, OPT_INT, 1024)
OPTION(mds_log_segment_size, OPT_INT, 0) // segment size for mds log,
// defaults to g_default_file_layout.fl_object_size (4MB)
OPTION(mds_log_max_segments, OPT_INT, 30)
// let the event register itself in the segment
assert(!segments.empty());
- le->_segment = segments.rbegin()->second;
- le->_segment->num_events++;
- le->update_segment();
+ LogSegment *ls = segments.rbegin()->second;
+ ls->num_events++;
+ le->_segment = ls;
+ le->update_segment();
le->set_stamp(ceph_clock_now(g_ceph_context));
-
- num_events++;
- assert(!capped);
-
- // encode it, with event type
- {
- bufferlist bl;
- le->encode_with_header(bl);
-
- dout(5) << "submit_entry " << journaler->get_write_pos() << "~" << bl.length()
- << " : " << *le << dendl;
-
- // journal it.
- journaler->append_entry(bl); // bl is destroyed.
- }
- le->_segment->end = journaler->get_write_pos();
+ pending_events[ls->seq].push_back(PendingEvent(le, c));
+ num_events++;
if (logger) {
logger->inc(l_mdl_evadd);
logger->set(l_mdl_ev, num_events);
- logger->set(l_mdl_wrpos, journaler->get_write_pos());
}
unflushed++;
-
- if (c)
- journaler->wait_for_flush(c);
- // start a new segment?
- // FIXME: should this go elsewhere?
- uint64_t last_seg = get_last_segment_offset();
uint64_t period = journaler->get_layout_period();
- // start a new segment if there are none or if we reach end of last segment
+ // start a new segment?
if (le->get_type() == EVENT_SUBTREEMAP ||
(le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
// avoid infinite loop when ESubtreeMap is very large.
// do not insert ESubtreeMap among EImportFinish events that finish
// disambiguate imports. Because the ESubtreeMap reflects the subtree
// state when all EImportFinish events are replayed.
- } else if (journaler->get_write_pos()/period != last_seg/period) {
- dout(10) << "submit_entry also starting new segment: last = " << last_seg
- << ", cur pos = " << journaler->get_write_pos() << dendl;
+ } else if (ls->end/period != ls->offset/period ||
+ ls->num_events >= g_conf->mds_log_events_per_segment) {
+ dout(10) << "submit_entry also starting new segment: last = "
+ << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
_start_new_segment();
} else if (g_conf->mds_debug_subtrees &&
le->get_type() != EVENT_SUBTREEMAP_TEST) {
sle->set_type(EVENT_SUBTREEMAP_TEST);
_submit_entry(sle, NULL);
}
-
- delete le;
}
void MDLog::_submit_thread()
void MDLog::wait_for_safe(Context *c)
{
- if (g_conf->mds_log) {
- // wait
- journaler->wait_for_flush(c);
- } else {
+ if (!g_conf->mds_log) {
// hack: bypass.
c->complete(0);
+ return;
+ }
+
+ submit_mutex.Lock();
+
+ bool no_pending = true;
+ if (!pending_events.empty()) {
+ pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
+ no_pending = false;
+ submit_cond.Signal();
}
+
+ submit_mutex.Unlock();
+
+ if (no_pending && c)
+ journaler->wait_for_flush(c);
}
void MDLog::flush()
{
- if (unflushed)
- journaler->flush();
+ submit_mutex.Lock();
+
+ bool do_flush = unflushed > 0;
unflushed = 0;
+ if (!pending_events.empty()) {
+ pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
+ do_flush = false;
+ submit_cond.Signal();
+ }
+
+ submit_mutex.Unlock();
+
+ if (do_flush)
+ journaler->flush();
}
void MDLog::cap()
uint64_t seq = event_seq + 1;
dout(7) << __func__ << " seq " << seq << dendl;
- segments[seq] = new LogSegment(seq, journaler->get_write_pos());
+ segments[seq] = new LogSegment(seq);
logger->inc(l_mdl_segadd);
logger->set(l_mdl_seg, segments.size());
friend class C_MDS_WroteImportMap;
friend class MDCache;
-public:
uint64_t get_last_segment_seq() {
assert(!segments.empty());
return segments.rbegin()->first;
}
- uint64_t get_last_segment_offset() {
- assert(!segments.empty());
- return segments.rbegin()->first;
- }
LogSegment *get_oldest_segment() {
return segments.begin()->second;
}
void submit_entry(LogEvent *e, Context *c = 0) {
Mutex::Locker l(submit_mutex);
_submit_entry(e, c);
+ submit_cond.Signal();
}
void start_submit_entry(LogEvent *e, Context *c = 0) {
Mutex::Locker l(submit_mutex);
_start_entry(e);
_submit_entry(e, c);
+ submit_cond.Signal();
}
bool entry_is_open() { return cur_event != NULL; }