]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: defer encoding/submitting log events to separate thread 2037/head
authorYan, Zheng <zheng.z.yan@intel.com>
Fri, 27 Jun 2014 07:12:26 +0000 (15:12 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Wed, 2 Jul 2014 06:01:27 +0000 (14:01 +0800)
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/common/config_opts.h
src/mds/MDLog.cc
src/mds/MDLog.h

index 1c9dbacc71d0bf97bb812fef6679717b931ff8e1..ecb46d45cc3b6d884714ea4c19c53b7139b35674 100644 (file)
@@ -313,6 +313,7 @@ OPTION(mds_default_dir_hash, OPT_INT, CEPH_STR_HASH_RJENKINS)
 OPTION(mds_log, OPT_BOOL, true)
 OPTION(mds_log_skip_corrupt_events, OPT_BOOL, false)
 OPTION(mds_log_max_events, OPT_INT, -1)
+OPTION(mds_log_events_per_segment, OPT_INT, 1024)
 OPTION(mds_log_segment_size, OPT_INT, 0)  // segment size for mds log,
              // defaults to g_default_file_layout.fl_object_size (4MB)
 OPTION(mds_log_max_segments, OPT_INT, 30)
index 738fe656d64925d435e23acdcd256b9fc8615f0d..194f5001f4049ce2280ef81e013d6c9e6b5159b9 100644 (file)
@@ -213,54 +213,35 @@ void MDLog::_submit_entry(LogEvent *le, Context *c)
 
   // let the event register itself in the segment
   assert(!segments.empty());
-  le->_segment = segments.rbegin()->second;
-  le->_segment->num_events++;
-  le->update_segment();
+  LogSegment *ls = segments.rbegin()->second;
+  ls->num_events++;
 
+  le->_segment = ls;
+  le->update_segment();
   le->set_stamp(ceph_clock_now(g_ceph_context));
-  
-  num_events++;
-  assert(!capped);
-  
-  // encode it, with event type
-  {
-    bufferlist bl;
-    le->encode_with_header(bl);
-
-    dout(5) << "submit_entry " << journaler->get_write_pos() << "~" << bl.length()
-           << " : " << *le << dendl;
-      
-    // journal it.
-    journaler->append_entry(bl);  // bl is destroyed.
-  }
 
-  le->_segment->end = journaler->get_write_pos();
+  pending_events[ls->seq].push_back(PendingEvent(le, c));
+  num_events++;
 
   if (logger) {
     logger->inc(l_mdl_evadd);
     logger->set(l_mdl_ev, num_events);
-    logger->set(l_mdl_wrpos, journaler->get_write_pos());
   }
 
   unflushed++;
-
-  if (c)
-    journaler->wait_for_flush(c);
   
-  // start a new segment?
-  //  FIXME: should this go elsewhere?
-  uint64_t last_seg = get_last_segment_offset();
   uint64_t period = journaler->get_layout_period();
-  // start a new segment if there are none or if we reach end of last segment
+  // start a new segment?
   if (le->get_type() == EVENT_SUBTREEMAP ||
       (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
     // avoid infinite loop when ESubtreeMap is very large.
     // do not insert ESubtreeMap among EImportFinish events that finish
     // disambiguate imports. Because the ESubtreeMap reflects the subtree
     // state when all EImportFinish events are replayed.
-  } else if (journaler->get_write_pos()/period != last_seg/period) {
-    dout(10) << "submit_entry also starting new segment: last = " << last_seg
-            << ", cur pos = " << journaler->get_write_pos() << dendl;
+  } else if (ls->end/period != ls->offset/period ||
+            ls->num_events >= g_conf->mds_log_events_per_segment) {
+    dout(10) << "submit_entry also starting new segment: last = "
+            << ls->seq  << "/" << ls->offset << ", event seq = " << event_seq << dendl;
     _start_new_segment();
   } else if (g_conf->mds_debug_subtrees &&
             le->get_type() != EVENT_SUBTREEMAP_TEST) {
@@ -271,8 +252,6 @@ void MDLog::_submit_entry(LogEvent *le, Context *c)
     sle->set_type(EVENT_SUBTREEMAP_TEST);
     _submit_entry(sle, NULL);
   }
-
-  delete le;
 }
 
 void MDLog::_submit_thread()
@@ -352,20 +331,43 @@ void MDLog::_submit_thread()
 
 void MDLog::wait_for_safe(Context *c)
 {
-  if (g_conf->mds_log) {
-    // wait
-    journaler->wait_for_flush(c);
-  } else {
+  if (!g_conf->mds_log) {
     // hack: bypass.
     c->complete(0);
+    return;
+  }
+
+  submit_mutex.Lock();
+
+  bool no_pending = true;
+  if (!pending_events.empty()) {
+    pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
+    no_pending = false;
+    submit_cond.Signal();
   }
+
+  submit_mutex.Unlock();
+
+  if (no_pending && c)
+    journaler->wait_for_flush(c);
 }
 
 void MDLog::flush()
 {
-  if (unflushed)
-    journaler->flush();
+  submit_mutex.Lock();
+
+  bool do_flush = unflushed > 0;
   unflushed = 0;
+  if (!pending_events.empty()) {
+    pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
+    do_flush = false;
+    submit_cond.Signal();
+  }
+
+  submit_mutex.Unlock();
+
+  if (do_flush)
+    journaler->flush();
 }
 
 void MDLog::cap()
@@ -408,7 +410,7 @@ void MDLog::_prepare_new_segment()
   uint64_t seq = event_seq + 1;
   dout(7) << __func__ << " seq " << seq << dendl;
 
-  segments[seq] = new LogSegment(seq, journaler->get_write_pos());
+  segments[seq] = new LogSegment(seq);
 
   logger->inc(l_mdl_segadd);
   logger->set(l_mdl_seg, segments.size());
index ac288d20738a97d02539ee42ed3d4d0fc45ad64f..a4d8d2537ce6fab654c2c8aa1143ff590b857550 100644 (file)
@@ -151,15 +151,10 @@ protected:
   friend class C_MDS_WroteImportMap;
   friend class MDCache;
 
-public:
   uint64_t get_last_segment_seq() {
     assert(!segments.empty());
     return segments.rbegin()->first;
   }
-  uint64_t get_last_segment_offset() {
-    assert(!segments.empty());
-    return segments.rbegin()->first;
-  }
   LogSegment *get_oldest_segment() {
     return segments.begin()->second;
   }
@@ -275,11 +270,13 @@ public:
   void submit_entry(LogEvent *e, Context *c = 0) {
     Mutex::Locker l(submit_mutex);
     _submit_entry(e, c);
+    submit_cond.Signal();
   }
   void start_submit_entry(LogEvent *e, Context *c = 0) {
     Mutex::Locker l(submit_mutex);
     _start_entry(e);
     _submit_entry(e, c);
+    submit_cond.Signal();
   }
   bool entry_is_open() { return cur_event != NULL; }