]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: use mutex to protect log segment list
authorYan, Zheng <zheng.z.yan@intel.com>
Wed, 25 Jun 2014 08:09:15 +0000 (16:09 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Wed, 2 Jul 2014 06:01:27 +0000 (14:01 +0800)
prevent race between creating new log segment and trimming old
segment.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDLog.cc
src/mds/MDLog.h

index 3aa825af0cf57ae28e7b4e4cb78d9e668581225b..49b3ff0315a77885a489b96c43a1f18207275782 100644 (file)
@@ -2273,7 +2273,7 @@ ESubtreeMap *MDCache::create_subtree_map()
   show_subtrees();
 
   ESubtreeMap *le = new ESubtreeMap();
-  mds->mdlog->start_entry(le);
+  mds->mdlog->_start_entry(le);
   
   CDir *mydir = 0;
   if (myin) {
index 2f2f262802167a4f523a1ce4bccaf4aeca32972c..738fe656d64925d435e23acdcd256b9fc8615f0d 100644 (file)
@@ -170,8 +170,10 @@ void MDLog::append()
 
 // -------------------------------------------------
 
-void MDLog::start_entry(LogEvent *e)
+void MDLog::_start_entry(LogEvent *e)
 {
+  assert(submit_mutex.is_locked_by_me());
+
   assert(cur_event == NULL);
   cur_event = e;
   e->set_start_off(get_write_pos());
@@ -192,9 +194,12 @@ void MDLog::cancel_entry(LogEvent *le)
   delete le;
 }
 
-void MDLog::submit_entry(LogEvent *le, Context *c) 
+void MDLog::_submit_entry(LogEvent *le, Context *c)
 {
+  assert(submit_mutex.is_locked_by_me());
   assert(!mds->is_any_replay());
+  assert(!capped);
+
   assert(le == cur_event);
   cur_event = NULL;
 
@@ -256,7 +261,7 @@ void MDLog::submit_entry(LogEvent *le, Context *c)
   } else if (journaler->get_write_pos()/period != last_seg/period) {
     dout(10) << "submit_entry also starting new segment: last = " << last_seg
             << ", cur pos = " << journaler->get_write_pos() << dendl;
-    start_new_segment();
+    _start_new_segment();
   } else if (g_conf->mds_debug_subtrees &&
             le->get_type() != EVENT_SUBTREEMAP_TEST) {
     // debug: journal this every time to catch subtree replay bugs.
@@ -264,7 +269,7 @@ void MDLog::submit_entry(LogEvent *le, Context *c)
     // LogSegment boundary on replay.
     LogEvent *sle = mds->mdcache->create_subtree_map();
     sle->set_type(EVENT_SUBTREEMAP_TEST);
-    submit_entry(sle);
+    _submit_entry(sle, NULL);
   }
 
   delete le;
@@ -390,14 +395,16 @@ void MDLog::shutdown()
 // -----------------------------
 // segments
 
-void MDLog::start_new_segment()
+void MDLog::_start_new_segment()
 {
-  prepare_new_segment();
-  journal_segment_subtree_map(NULL);
+  _prepare_new_segment();
+  _journal_segment_subtree_map(NULL);
 }
 
-void MDLog::prepare_new_segment()
+void MDLog::_prepare_new_segment()
 {
+  assert(submit_mutex.is_locked_by_me());
+
   uint64_t seq = event_seq + 1;
   dout(7) << __func__ << " seq " << seq << dendl;
 
@@ -412,12 +419,14 @@ void MDLog::prepare_new_segment()
   mds->mdcache->advance_stray();
 }
 
-void MDLog::journal_segment_subtree_map(Context *onsync)
+void MDLog::_journal_segment_subtree_map(Context *onsync)
 {
+  assert(submit_mutex.is_locked_by_me());
+
   dout(7) << __func__ << dendl;
   ESubtreeMap *sle = mds->mdcache->create_subtree_map();
   sle->event_seq = get_last_segment_seq();
-  submit_entry(sle, onsync);
+  _submit_entry(sle, onsync);
 }
 
 void MDLog::trim(int m)
@@ -427,6 +436,8 @@ void MDLog::trim(int m)
   if (m >= 0)
     max_events = m;
 
+  submit_mutex.Lock();
+
   // trim!
   dout(10) << "trim " 
           << segments.size() << " / " << max_segments << " segments, " 
@@ -435,8 +446,10 @@ void MDLog::trim(int m)
           << ", " << expired_segments.size() << " (" << expired_events << ") expired"
           << dendl;
 
-  if (segments.empty())
+  if (segments.empty()) {
+    submit_mutex.Unlock();
     return;
+  }
 
   // hack: only trim for a few seconds at a time
   utime_t stop = ceph_clock_now(g_ceph_context);
@@ -465,7 +478,8 @@ void MDLog::trim(int m)
     assert(ls);
     ++p;
     
-    if (ls->end > journaler->get_write_safe_pos()) {
+    if (pending_events.count(ls->seq) ||
+       ls->end > journaler->get_write_safe_pos()) {
       dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
              << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
       break;
@@ -477,11 +491,20 @@ void MDLog::trim(int m)
       dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
              << ", " << ls->num_events << " events" << dendl;
     } else {
+      assert(expiring_segments.count(ls) == 0);
+      expiring_segments.insert(ls);
+      expiring_events += ls->num_events;
+      submit_mutex.Unlock();
+
+      uint64_t last_seq = ls->seq;
       try_expire(ls, op_prio);
+
+      submit_mutex.Lock();
+      p = segments.lower_bound(last_seq + 1);
     }
   }
 
-  // discard expired segments
+  // discard expired segments and unlock submit_mutex
   _trim_expired_segments();
 }
 
@@ -490,16 +513,19 @@ void MDLog::try_expire(LogSegment *ls, int op_prio)
 {
   C_GatherBuilder gather_bld(g_ceph_context);
   ls->try_to_expire(mds, gather_bld, op_prio);
+
   if (gather_bld.has_subs()) {
-    assert(expiring_segments.count(ls) == 0);
-    expiring_segments.insert(ls);
-    expiring_events += ls->num_events;
     dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
     gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
     gather_bld.activate();
   } else {
     dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
+    submit_mutex.Lock();
+    assert(expiring_segments.count(ls));
+    expiring_segments.erase(ls);
+    expiring_events -= ls->num_events;
     _expired(ls);
+    submit_mutex.Unlock();
   }
   
   logger->set(l_mdl_segexg, expiring_segments.size());
@@ -510,14 +536,13 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
 {
   dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
           << ", " << ls->num_events << " events" << dendl;
-  assert(expiring_segments.count(ls));
-  expiring_segments.erase(ls);
-  expiring_events -= ls->num_events;
   try_expire(ls, op_prio);
 }
 
 void MDLog::_trim_expired_segments()
 {
+  assert(submit_mutex.is_locked_by_me());
+
   // trim expired segments?
   bool trimmed = false;
   while (!segments.empty()) {
@@ -546,13 +571,17 @@ void MDLog::_trim_expired_segments()
     delete ls;
     trimmed = true;
   }
-  
+
+  submit_mutex.Unlock();
+
   if (trimmed)
     journaler->write_head(0);
 }
 
 void MDLog::_expired(LogSegment *ls)
 {
+  assert(submit_mutex.is_locked_by_me());
+
   dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
          << ", " << ls->num_events << " events" << dendl;
 
index b64504600ca147b335f508567c181ba290df4649..ac288d20738a97d02539ee42ed3d4d0fc45ad64f 100644 (file)
@@ -169,8 +169,6 @@ public:
     segments.erase(p);
   }
 
-
-private:
   struct C_MDL_WriteError : public Context {
     MDLog *mdlog;
     C_MDL_WriteError(MDLog *m) : mdlog(m) {}
@@ -206,10 +204,27 @@ public:
   ~MDLog();
 
 
+private:
   // -- segments --
-  void start_new_segment();
-  void prepare_new_segment();
-  void journal_segment_subtree_map(Context *onsync);
+  void _start_new_segment();
+  void _prepare_new_segment();
+  void _journal_segment_subtree_map(Context *onsync);
+public:
+  void start_new_segment() {
+    Mutex::Locker l(submit_mutex);
+    _start_new_segment();
+  }
+  void prepare_new_segment() {
+    Mutex::Locker l(submit_mutex);
+    _prepare_new_segment();
+  }
+  void journal_segment_subtree_map(Context *onsync=NULL) {
+    submit_mutex.Lock();
+    _journal_segment_subtree_map(onsync);
+    submit_mutex.Unlock();
+    if (onsync)
+      flush();
+  }
 
   LogSegment *peek_current_segment() {
     return segments.empty() ? NULL : segments.rbegin()->second;
@@ -250,12 +265,21 @@ public:
 private:
   LogEvent *cur_event;
 public:
-  void start_entry(LogEvent *e);
+  void _start_entry(LogEvent *e);
+  void start_entry(LogEvent *e) {
+    Mutex::Locker l(submit_mutex);
+    _start_entry(e);
+  }
   void cancel_entry(LogEvent *e);
-  void submit_entry(LogEvent *e, Context *c = 0);
+  void _submit_entry(LogEvent *e, Context *c);
+  void submit_entry(LogEvent *e, Context *c = 0) {
+    Mutex::Locker l(submit_mutex);
+    _submit_entry(e, c);
+  }
   void start_submit_entry(LogEvent *e, Context *c = 0) {
-    start_entry(e);
-    submit_entry(e, c);
+    Mutex::Locker l(submit_mutex);
+    _start_entry(e);
+    _submit_entry(e, c);
   }
   bool entry_is_open() { return cur_event != NULL; }