// -------------------------------------------------
-void MDLog::start_entry(LogEvent *e)
+void MDLog::_start_entry(LogEvent *e)
{
+ assert(submit_mutex.is_locked_by_me());
+
assert(cur_event == NULL);
cur_event = e;
e->set_start_off(get_write_pos());
delete le;
}
-void MDLog::submit_entry(LogEvent *le, Context *c)
+void MDLog::_submit_entry(LogEvent *le, Context *c)
{
+ assert(submit_mutex.is_locked_by_me());
assert(!mds->is_any_replay());
+ assert(!capped);
+
assert(le == cur_event);
cur_event = NULL;
} else if (journaler->get_write_pos()/period != last_seg/period) {
dout(10) << "submit_entry also starting new segment: last = " << last_seg
<< ", cur pos = " << journaler->get_write_pos() << dendl;
- start_new_segment();
+ _start_new_segment();
} else if (g_conf->mds_debug_subtrees &&
le->get_type() != EVENT_SUBTREEMAP_TEST) {
// debug: journal this every time to catch subtree replay bugs.
// LogSegment boundary on replay.
LogEvent *sle = mds->mdcache->create_subtree_map();
sle->set_type(EVENT_SUBTREEMAP_TEST);
- submit_entry(sle);
+ _submit_entry(sle, NULL);
}
delete le;
// -----------------------------
// segments
-void MDLog::start_new_segment()
+void MDLog::_start_new_segment()
{
- prepare_new_segment();
- journal_segment_subtree_map(NULL);
+ _prepare_new_segment();
+ _journal_segment_subtree_map(NULL);
}
-void MDLog::prepare_new_segment()
+void MDLog::_prepare_new_segment()
{
+ assert(submit_mutex.is_locked_by_me());
+
uint64_t seq = event_seq + 1;
dout(7) << __func__ << " seq " << seq << dendl;
mds->mdcache->advance_stray();
}
-void MDLog::journal_segment_subtree_map(Context *onsync)
+void MDLog::_journal_segment_subtree_map(Context *onsync)
{
+ assert(submit_mutex.is_locked_by_me());
+
dout(7) << __func__ << dendl;
ESubtreeMap *sle = mds->mdcache->create_subtree_map();
sle->event_seq = get_last_segment_seq();
- submit_entry(sle, onsync);
+ _submit_entry(sle, onsync);
}
void MDLog::trim(int m)
if (m >= 0)
max_events = m;
+ submit_mutex.Lock();
+
// trim!
dout(10) << "trim "
<< segments.size() << " / " << max_segments << " segments, "
<< ", " << expired_segments.size() << " (" << expired_events << ") expired"
<< dendl;
- if (segments.empty())
+ if (segments.empty()) {
+ submit_mutex.Unlock();
return;
+ }
// hack: only trim for a few seconds at a time
utime_t stop = ceph_clock_now(g_ceph_context);
assert(ls);
++p;
- if (ls->end > journaler->get_write_safe_pos()) {
+ if (pending_events.count(ls->seq) ||
+ ls->end > journaler->get_write_safe_pos()) {
dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
<< journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
break;
dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
} else {
+ assert(expiring_segments.count(ls) == 0);
+ expiring_segments.insert(ls);
+ expiring_events += ls->num_events;
+ submit_mutex.Unlock();
+
+ uint64_t last_seq = ls->seq;
try_expire(ls, op_prio);
+
+ submit_mutex.Lock();
+ p = segments.lower_bound(last_seq + 1);
}
}
- // discard expired segments
+ // discard expired segments and unlock submit_mutex
_trim_expired_segments();
}
{
C_GatherBuilder gather_bld(g_ceph_context);
ls->try_to_expire(mds, gather_bld, op_prio);
+
if (gather_bld.has_subs()) {
- assert(expiring_segments.count(ls) == 0);
- expiring_segments.insert(ls);
- expiring_events += ls->num_events;
dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
gather_bld.activate();
} else {
dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
+ submit_mutex.Lock();
+ assert(expiring_segments.count(ls));
+ expiring_segments.erase(ls);
+ expiring_events -= ls->num_events;
_expired(ls);
+ submit_mutex.Unlock();
}
logger->set(l_mdl_segexg, expiring_segments.size());
{
dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
- assert(expiring_segments.count(ls));
- expiring_segments.erase(ls);
- expiring_events -= ls->num_events;
try_expire(ls, op_prio);
}
void MDLog::_trim_expired_segments()
{
+ assert(submit_mutex.is_locked_by_me());
+
// trim expired segments?
bool trimmed = false;
while (!segments.empty()) {
delete ls;
trimmed = true;
}
-
+
+ submit_mutex.Unlock();
+
if (trimmed)
journaler->write_head(0);
}
void MDLog::_expired(LogSegment *ls)
{
+ assert(submit_mutex.is_locked_by_me());
+
dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
segments.erase(p);
}
-
-private:
struct C_MDL_WriteError : public Context {
MDLog *mdlog;
C_MDL_WriteError(MDLog *m) : mdlog(m) {}
~MDLog();
+private:
// -- segments --
- void start_new_segment();
- void prepare_new_segment();
- void journal_segment_subtree_map(Context *onsync);
+ void _start_new_segment();
+ void _prepare_new_segment();
+ void _journal_segment_subtree_map(Context *onsync);
+public:
+ void start_new_segment() {
+ Mutex::Locker l(submit_mutex);
+ _start_new_segment();
+ }
+ void prepare_new_segment() {
+ Mutex::Locker l(submit_mutex);
+ _prepare_new_segment();
+ }
+ void journal_segment_subtree_map(Context *onsync=NULL) {
+ submit_mutex.Lock();
+ _journal_segment_subtree_map(onsync);
+ submit_mutex.Unlock();
+ if (onsync)
+ flush();
+ }
LogSegment *peek_current_segment() {
return segments.empty() ? NULL : segments.rbegin()->second;
private:
LogEvent *cur_event;
public:
- void start_entry(LogEvent *e);
+ void _start_entry(LogEvent *e);
+ void start_entry(LogEvent *e) {
+ Mutex::Locker l(submit_mutex);
+ _start_entry(e);
+ }
void cancel_entry(LogEvent *e);
- void submit_entry(LogEvent *e, Context *c = 0);
+ void _submit_entry(LogEvent *e, Context *c);
+ void submit_entry(LogEvent *e, Context *c = 0) {
+ Mutex::Locker l(submit_mutex);
+ _submit_entry(e, c);
+ }
void start_submit_entry(LogEvent *e, Context *c = 0) {
- start_entry(e);
- submit_entry(e, c);
+ Mutex::Locker l(submit_mutex);
+ _start_entry(e);
+ _submit_entry(e, c);
}
bool entry_is_open() { return cur_event != NULL; }