#include "common/Cond.h"
#include "events/ESubtreeMap.h"
+#include "events/ESegment.h"
#include "common/config.h"
#include "common/errno.h"
debug_subtrees = g_conf().get_val<bool>("mds_debug_subtrees");
events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
pause = g_conf().get_val<bool>("mds_log_pause");
+ major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
max_segments = g_conf().get_val<uint64_t>("mds_log_max_segments");
max_events = g_conf().get_val<int64_t>("mds_log_max_events");
}
plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
+ plb.add_u64(l_mdl_segmjr, "segmjr", "Major Segments");
plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed",
"repl", PerfCountersBuilder::PRIO_INTERESTING);
plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
// -------------------------------------------------
+LogSegment* MDLog::_start_new_segment(SegmentBoundary* sb)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+ auto ls = new LogSegment(event_seq);
+ segments[event_seq] = ls;
+ logger->inc(l_mdl_segadd);
+ logger->set(l_mdl_seg, segments.size());
+ sb->set_seq(event_seq);
+
+ // Adjust to next stray dir
+ mds->mdcache->advance_stray();
+ return ls;
+}
+
void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase* c)
{
dout(20) << __func__ << " " << *le << dendl;
ceph_assert(!mds_is_shutting_down);
event_seq++;
+ events_since_last_major_segment++;
+
+ if (auto sb = dynamic_cast<SegmentBoundary*>(le); sb) {
+ auto ls = _start_new_segment(sb);
+ if (sb->is_major_segment_boundary()) {
+ major_segments.insert(ls->seq);
+ logger->set(l_mdl_segmjr, major_segments.size());
+ events_since_last_major_segment = 0;
+ }
+ }
EMetaBlob *metablob = le->get_metablob();
if (metablob) {
unflushed++;
}
-void MDLog::_segment_upkeep(LogEvent* le)
+void MDLog::_segment_upkeep()
{
ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
uint64_t period = journaler->get_layout_period();
auto ls = get_current_segment();
// start a new segment?
- if (le->get_type() == EVENT_SUBTREEMAP ||
- (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
- // avoid infinite loop when ESubtreeMap is very large.
- // do not insert ESubtreeMap among EImportFinish events that finish
- // disambiguate imports. Because the ESubtreeMap reflects the subtree
- // state when all EImportFinish events are replayed.
+ if (events_since_last_major_segment > events_per_segment*major_segment_event_ratio) {
+ dout(10) << __func__ << ": starting new major segment, current " << *ls << dendl;
+ auto sle = mds->mdcache->create_subtree_map();
+ _submit_entry(sle, NULL);
} else if (ls->end/period != ls->offset/period || ls->num_events >= events_per_segment) {
- dout(10) << "submit_entry also starting new segment: last = "
- << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
- _start_new_segment();
- } else if (debug_subtrees && le->get_type() != EVENT_SUBTREEMAP_TEST) {
+ dout(10) << __func__ << ": starting new segment, current " << *ls << dendl;
+ auto sb = new ESegment();
+ _submit_entry(sb, nullptr);
+ } else if (debug_subtrees && ls->num_events > 1) {
// debug: journal this every time to catch subtree replay bugs.
// use a different event id so it doesn't get interpreted as a
// LogSegment boundary on replay.
dout(10) << __func__ << ": creating test subtree map" << dendl;
- LogEvent *sle = mds->mdcache->create_subtree_map();
+ auto sle = mds->mdcache->create_subtree_map();
sle->set_type(EVENT_SUBTREEMAP_TEST);
_submit_entry(sle, NULL);
}
uint64_t write_pos = journaler->get_write_pos();
le->set_start_off(write_pos);
- if (le->get_type() == EVENT_SUBTREEMAP)
+ if (dynamic_cast<SegmentBoundary*>(le)) {
ls->offset = write_pos;
+ }
dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
<< " : " << *le << dendl;
}
}
-
-// -----------------------------
-// segments
-
-void MDLog::_start_new_segment()
-{
- _prepare_new_segment();
- _journal_segment_subtree_map(NULL);
-}
-
-void MDLog::_prepare_new_segment()
-{
- ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
-
- uint64_t seq = event_seq + 1;
- dout(7) << __func__ << " seq " << seq << dendl;
-
- segments[seq] = new LogSegment(seq);
-
- logger->inc(l_mdl_segadd);
- logger->set(l_mdl_seg, segments.size());
-
- // Adjust to next stray dir
- mds->mdcache->advance_stray();
-}
-
-void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
-{
- ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
-
- dout(7) << __func__ << dendl;
- ESubtreeMap *sle = mds->mdcache->create_subtree_map();
- sle->event_seq = get_last_segment_seq();
-
- _submit_entry(sle, new C_MDL_Flushed(this, onsync));
-}
-
class C_OFT_Committed : public MDSInternalContext {
MDLog *mdlog;
uint64_t seq;
{
ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
- uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
+ uint64_t const oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
// trim expired segments?
bool trimmed = false;
- while (!segments.empty()) {
- LogSegment *ls = segments.begin()->second;
+ uint64_t end = 0;
+ for (auto it = segments.begin(); it != segments.end(); ++it) {
+ auto& [seq, ls] = *it;
+ dout(20) << __func__ << ": examining seq=" << seq << " ls=" << *ls << dendl;
+
+ if (auto msit = major_segments.find(seq); msit != major_segments.end() && end > 0) {
+ dout(10) << __func__ << ": expiring up to this major segment " << seq << dendl;
+ uint64_t expire_pos = 0;
+ for (auto& [seq2, ls2] : segments) {
+ if (seq <= seq2) {
+ break;
+ }
+ dout(20) << __func__ << ": expiring " << *ls2 << dendl;
+ expired_events -= ls2->num_events;
+ expired_segments.erase(ls2);
+ if (pre_segments_size > 0)
+ pre_segments_size--;
+ num_events -= ls2->num_events;
+ logger->inc(l_mdl_evtrm, ls2->num_events);
+ logger->inc(l_mdl_segtrm);
+ expire_pos = ls2->end;
+ delete ls2;
+ }
+ segments.erase(segments.begin(), it);
+ logger->set(l_mdl_seg, segments.size());
+ major_segments.erase(major_segments.begin(), msit);
+ logger->set(l_mdl_segmjr, major_segments.size());
+
+ auto jexpire_pos = journaler->get_expire_pos();
+ if (jexpire_pos < expire_pos) {
+ journaler->set_expire_pos(expire_pos);
+ logger->set(l_mdl_expos, expire_pos);
+ } else {
+ logger->set(l_mdl_expos, jexpire_pos);
+ }
+ trimmed = true;
+ }
+
if (!expired_segments.count(ls)) {
- dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
- << " to expire" << dendl;
+ dout(10) << __func__ << " waiting for expiry " << *ls << dendl;
break;
}
if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) {
- dout(10) << "_trim_expired_segments open file table committedseq " << oft_committed_seq
+ dout(10) << __func__ << " defer expire for open file table committedseq " << oft_committed_seq
<< " <= " << ls->seq << "/" << ls->offset << dendl;
break;
}
- dout(10) << "_trim_expired_segments trimming expired "
- << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
- expired_events -= ls->num_events;
- expired_segments.erase(ls);
- if (pre_segments_size > 0)
- pre_segments_size--;
- num_events -= ls->num_events;
-
- // this was the oldest segment, adjust expire pos
- if (journaler->get_expire_pos() < ls->end) {
- journaler->set_expire_pos(ls->end);
- logger->set(l_mdl_expos, ls->end);
- } else {
- logger->set(l_mdl_expos, ls->offset);
- }
-
- logger->inc(l_mdl_segtrm);
- logger->inc(l_mdl_evtrm, ls->num_events);
-
- segments.erase(ls->seq);
- delete ls;
- trimmed = true;
+ end = seq;
+ dout(10) << __func__ << ": maybe expiring " << *ls << dendl;
}
submit_mutex.unlock();
logger->set(l_mdl_ev, num_events);
logger->set(l_mdl_evexd, expired_events);
- logger->set(l_mdl_seg, segments.size());
logger->set(l_mdl_segexd, expired_segments.size());
}
if (le) {
bool modified = false;
- if (le->get_type() == EVENT_SUBTREEMAP ||
- le->get_type() == EVENT_RESETJOURNAL) {
- auto sle = dynamic_cast<ESubtreeMap*>(le.get());
- if (sle == NULL || sle->event_seq == 0) {
+ if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) {
+ if (sb->get_seq() == 0) {
// A non-explicit event seq: the effective sequence number
// of this segment is it's position in the old journal and
// the new effective sequence number will be its position
|| le->get_type() == EVENT_SUBTREEMAP_TEST) {
auto& sle = dynamic_cast<ESubtreeMap&>(*le);
dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
- << le_pos << " seq=" << sle.event_seq << dendl;
+ << le_pos << " seq=" << sle.get_seq() << dendl;
sle.expire_pos = 0;
modified = true;
}
ceph_abort(); // Should be unreachable because damaged() calls
// respawn()
}
-
}
le->set_start_off(pos);
- // new segment?
- if (le->get_type() == EVENT_SUBTREEMAP ||
- le->get_type() == EVENT_RESETJOURNAL) {
- auto sle = dynamic_cast<ESubtreeMap*>(le.get());
- if (sle && sle->event_seq > 0)
- event_seq = sle->event_seq;
- else
- event_seq = pos;
+ // have we seen an import map yet?
+ if (segments.empty() && !dynamic_cast<ESubtreeMap*>(le.get())) {
+ dout(1) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
+ << " " << le->get_stamp() << " -- waiting for ESubtreeMap. (skipping " << *le << ")" << dendl;
+ continue;
+ }
+
+ events_since_last_major_segment++;
+ if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) {
+ auto seq = sb->get_seq();
+ if (seq > 0) {
+ event_seq = seq;
+ } else {
+ event_seq = pos;
+ }
segments[event_seq] = new LogSegment(event_seq, pos);
logger->set(l_mdl_seg, segments.size());
+ if (sb->is_major_segment_boundary()) {
+ major_segments.insert(event_seq);
+ logger->set(l_mdl_segmjr, major_segments.size());
+ events_since_last_major_segment = 0;
+ }
} else {
event_seq++;
}
- // have we seen an import map yet?
- if (segments.empty()) {
- dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
- << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
- } else {
- dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
- << " " << le->get_stamp() << ": " << *le << dendl;
- le->_segment = get_current_segment(); // replay may need this
- le->_segment->num_events++;
- le->_segment->end = journaler->get_read_pos();
- num_events++;
- logger->set(l_mdl_ev, num_events);
-
- {
- std::lock_guard l(mds->mds_lock);
- if (mds->is_daemon_stopping()) {
- return;
- }
- logger->inc(l_mdl_replayed);
- le->replay(mds);
+ dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
+ << " " << le->get_stamp() << ": " << *le << dendl;
+ le->_segment = get_current_segment(); // replay may need this
+ le->_segment->num_events++;
+ le->_segment->end = journaler->get_read_pos();
+ num_events++;
+ logger->set(l_mdl_ev, num_events);
+
+ {
+ std::lock_guard l(mds->mds_lock);
+ if (mds->is_daemon_stopping()) {
+ return;
}
+ logger->inc(l_mdl_replayed);
+ le->replay(mds);
}
logger->set(l_mdl_rdpos, pos);
if (changed.count("mds_log_events_per_segment")) {
events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
}
+ if (changed.count("mds_log_major_segment_event_ratio")) {
+ major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
+ }
if (changed.count("mds_log_max_events")) {
max_events = g_conf().get_val<int64_t>("mds_log_max_events");
}
l_mdl_segex,
l_mdl_segtrm,
l_mdl_seg,
+ l_mdl_segmjr,
l_mdl_segexg,
l_mdl_segexd,
l_mdl_expos,
#include "LogSegment.h"
#include "MDSMap.h"
+#include "SegmentBoundary.h"
#include <list>
#include <map>
void create_logger();
void set_write_iohint(unsigned iohint_flags);
- void start_new_segment() {
- std::lock_guard l(submit_mutex);
- _start_new_segment();
- }
- void prepare_new_segment() {
- std::lock_guard l(submit_mutex);
- _prepare_new_segment();
- }
- void journal_segment_subtree_map(MDSContext *onsync=NULL) {
- {
- std::lock_guard l{submit_mutex};
- _journal_segment_subtree_map(onsync);
- }
- if (onsync)
- flush();
- }
-
LogSegment *peek_current_segment() {
return segments.empty() ? NULL : segments.rbegin()->second;
}
Journaler *get_journaler() { return journaler; }
bool empty() const { return segments.empty(); }
+ uint64_t get_last_major_segment_seq() const {
+ ceph_assert(!major_segments.empty());
+ return *major_segments.rbegin();
+ }
uint64_t get_last_segment_seq() const {
ceph_assert(!segments.empty());
return segments.rbegin()->first;
void submit_entry(LogEvent *e, MDSLogContextBase* c = 0) {
std::lock_guard l(submit_mutex);
_submit_entry(e, c);
- _segment_upkeep(e);
+ _segment_upkeep();
submit_cond.notify_all();
}
// -- segments --
std::map<uint64_t,LogSegment*> segments;
- std::set<LogSegment*> expiring_segments;
- std::set<LogSegment*> expired_segments;
std::size_t pre_segments_size = 0; // the num of segments when the mds finished replay-journal, to calc the num of segments growing
- uint64_t event_seq = 0;
+ LogSegment::seq_t event_seq = 0;
uint64_t expiring_events = 0;
uint64_t expired_events = 0;
friend class C_MDL_Flushed;
friend class C_OFT_Committed;
- // -- segments --
- void _start_new_segment();
- void _prepare_new_segment();
- void _segment_upkeep(LogEvent* le);
- void _journal_segment_subtree_map(MDSContext *onsync);
- void _submit_entry(LogEvent *e, MDSLogContextBase *c);
-
void try_to_commit_open_file_table(uint64_t last_seq);
+ LogSegment* _start_new_segment(SegmentBoundary* sb);
+ void _segment_upkeep();
+ void _submit_entry(LogEvent* e, MDSLogContextBase* c);
void try_expire(LogSegment *ls, int op_prio);
void _maybe_expired(LogSegment *ls, int op_prio);
bool debug_subtrees;
uint64_t events_per_segment;
+ uint64_t major_segment_event_ratio;
int64_t max_events;
uint64_t max_segments;
bool pause;
+
+ std::set<uint64_t> major_segments;
+ std::set<LogSegment*> expired_segments;
+ std::set<LogSegment*> expiring_segments;
+ uint64_t events_since_last_major_segment = 0;
};
#endif