From: Yan, Zheng Date: Wed, 25 Jun 2014 08:09:15 +0000 (+0800) Subject: mds: use mutex to protect log segment list X-Git-Tag: v0.84~138^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=44199d6afda1fc991939a3e5d8882a7265c5d713;p=ceph.git mds: use mutex to protect log segment list prevent race between creating new log segment and trimming old segment. Signed-off-by: Yan, Zheng --- diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 3aa825af0cf57..49b3ff0315a77 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -2273,7 +2273,7 @@ ESubtreeMap *MDCache::create_subtree_map() show_subtrees(); ESubtreeMap *le = new ESubtreeMap(); - mds->mdlog->start_entry(le); + mds->mdlog->_start_entry(le); CDir *mydir = 0; if (myin) { diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 2f2f262802167..738fe656d6492 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -170,8 +170,10 @@ void MDLog::append() // ------------------------------------------------- -void MDLog::start_entry(LogEvent *e) +void MDLog::_start_entry(LogEvent *e) { + assert(submit_mutex.is_locked_by_me()); + assert(cur_event == NULL); cur_event = e; e->set_start_off(get_write_pos()); @@ -192,9 +194,12 @@ void MDLog::cancel_entry(LogEvent *le) delete le; } -void MDLog::submit_entry(LogEvent *le, Context *c) +void MDLog::_submit_entry(LogEvent *le, Context *c) { + assert(submit_mutex.is_locked_by_me()); assert(!mds->is_any_replay()); + assert(!capped); + assert(le == cur_event); cur_event = NULL; @@ -256,7 +261,7 @@ void MDLog::submit_entry(LogEvent *le, Context *c) } else if (journaler->get_write_pos()/period != last_seg/period) { dout(10) << "submit_entry also starting new segment: last = " << last_seg << ", cur pos = " << journaler->get_write_pos() << dendl; - start_new_segment(); + _start_new_segment(); } else if (g_conf->mds_debug_subtrees && le->get_type() != EVENT_SUBTREEMAP_TEST) { // debug: journal this every time to catch subtree replay bugs. @@ -264,7 +269,7 @@ void MDLog::submit_entry(LogEvent *le, Context *c) // LogSegment boundary on replay. LogEvent *sle = mds->mdcache->create_subtree_map(); sle->set_type(EVENT_SUBTREEMAP_TEST); - submit_entry(sle); + _submit_entry(sle, NULL); } delete le; @@ -390,14 +395,16 @@ void MDLog::shutdown() // ----------------------------- // segments -void MDLog::start_new_segment() +void MDLog::_start_new_segment() { - prepare_new_segment(); - journal_segment_subtree_map(NULL); + _prepare_new_segment(); + _journal_segment_subtree_map(NULL); } -void MDLog::prepare_new_segment() +void MDLog::_prepare_new_segment() { + assert(submit_mutex.is_locked_by_me()); + uint64_t seq = event_seq + 1; dout(7) << __func__ << " seq " << seq << dendl; @@ -412,12 +419,14 @@ void MDLog::prepare_new_segment() mds->mdcache->advance_stray(); } -void MDLog::journal_segment_subtree_map(Context *onsync) +void MDLog::_journal_segment_subtree_map(Context *onsync) { + assert(submit_mutex.is_locked_by_me()); + dout(7) << __func__ << dendl; ESubtreeMap *sle = mds->mdcache->create_subtree_map(); sle->event_seq = get_last_segment_seq(); - submit_entry(sle, onsync); + _submit_entry(sle, onsync); } void MDLog::trim(int m) @@ -427,6 +436,8 @@ void MDLog::trim(int m) if (m >= 0) max_events = m; + submit_mutex.Lock(); + // trim! dout(10) << "trim " << segments.size() << " / " << max_segments << " segments, " @@ -435,8 +446,10 @@ void MDLog::trim(int m) << ", " << expired_segments.size() << " (" << expired_events << ") expired" << dendl; - if (segments.empty()) + if (segments.empty()) { + submit_mutex.Unlock(); return; + } // hack: only trim for a few seconds at a time utime_t stop = ceph_clock_now(g_ceph_context); @@ -465,7 +478,8 @@ void MDLog::trim(int m) assert(ls); ++p; - if (ls->end > journaler->get_write_safe_pos()) { + if (pending_events.count(ls->seq) || + ls->end > journaler->get_write_safe_pos()) { dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe " << journaler->get_write_safe_pos() << " < end " << ls->end << dendl; break; @@ -477,11 +491,20 @@ void MDLog::trim(int m) dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset << ", " << ls->num_events << " events" << dendl; } else { + assert(expiring_segments.count(ls) == 0); + expiring_segments.insert(ls); + expiring_events += ls->num_events; + submit_mutex.Unlock(); + + uint64_t last_seq = ls->seq; try_expire(ls, op_prio); + + submit_mutex.Lock(); + p = segments.lower_bound(last_seq + 1); } } - // discard expired segments + // discard expired segments and unlock submit_mutex _trim_expired_segments(); } @@ -490,16 +513,19 @@ void MDLog::try_expire(LogSegment *ls, int op_prio) { C_GatherBuilder gather_bld(g_ceph_context); ls->try_to_expire(mds, gather_bld, op_prio); + if (gather_bld.has_subs()) { - assert(expiring_segments.count(ls) == 0); - expiring_segments.insert(ls); - expiring_events += ls->num_events; dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl; gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio)); gather_bld.activate(); } else { dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl; + submit_mutex.Lock(); + assert(expiring_segments.count(ls)); + expiring_segments.erase(ls); + expiring_events -= ls->num_events; _expired(ls); + submit_mutex.Unlock(); } logger->set(l_mdl_segexg, expiring_segments.size()); @@ -510,14 +536,13 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio) { dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset << ", " << ls->num_events << " events" << dendl; - assert(expiring_segments.count(ls)); - expiring_segments.erase(ls); - expiring_events -= ls->num_events; try_expire(ls, op_prio); } void MDLog::_trim_expired_segments() { + assert(submit_mutex.is_locked_by_me()); + // trim expired segments? bool trimmed = false; while (!segments.empty()) { @@ -546,13 +571,17 @@ void MDLog::_trim_expired_segments() delete ls; trimmed = true; } - + + submit_mutex.Unlock(); + if (trimmed) journaler->write_head(0); } void MDLog::_expired(LogSegment *ls) { + assert(submit_mutex.is_locked_by_me()); + dout(5) << "_expired segment " << ls->seq << "/" << ls->offset << ", " << ls->num_events << " events" << dendl; diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index b64504600ca14..ac288d20738a9 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -169,8 +169,6 @@ public: segments.erase(p); } - -private: struct C_MDL_WriteError : public Context { MDLog *mdlog; C_MDL_WriteError(MDLog *m) : mdlog(m) {} @@ -206,10 +204,27 @@ public: ~MDLog(); +private: // -- segments -- - void start_new_segment(); - void prepare_new_segment(); - void journal_segment_subtree_map(Context *onsync); + void _start_new_segment(); + void _prepare_new_segment(); + void _journal_segment_subtree_map(Context *onsync); +public: + void start_new_segment() { + Mutex::Locker l(submit_mutex); + _start_new_segment(); + } + void prepare_new_segment() { + Mutex::Locker l(submit_mutex); + _prepare_new_segment(); + } + void journal_segment_subtree_map(Context *onsync=NULL) { + submit_mutex.Lock(); + _journal_segment_subtree_map(onsync); + submit_mutex.Unlock(); + if (onsync) + flush(); + } LogSegment *peek_current_segment() { return segments.empty() ? NULL : segments.rbegin()->second; @@ -250,12 +265,21 @@ public: private: LogEvent *cur_event; public: - void start_entry(LogEvent *e); + void _start_entry(LogEvent *e); + void start_entry(LogEvent *e) { + Mutex::Locker l(submit_mutex); + _start_entry(e); + } void cancel_entry(LogEvent *e); - void submit_entry(LogEvent *e, Context *c = 0); + void _submit_entry(LogEvent *e, Context *c); + void submit_entry(LogEvent *e, Context *c = 0) { + Mutex::Locker l(submit_mutex); + _submit_entry(e, c); + } void start_submit_entry(LogEvent *e, Context *c = 0) { - start_entry(e); - submit_entry(e, c); + Mutex::Locker l(submit_mutex); + _start_entry(e); + _submit_entry(e, c); } bool entry_is_open() { return cur_event != NULL; }