]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add thread to encode/submit log events
authorYan, Zheng <zheng.z.yan@intel.com>
Wed, 25 Jun 2014 07:09:33 +0000 (15:09 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Wed, 2 Jul 2014 06:01:25 +0000 (14:01 +0800)
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/MDS.cc

index f98317a277c21757a41a22f2954229be02f75689..2f2f262802167a4f523a1ce4bccaf4aeca32972c 100644 (file)
@@ -139,6 +139,8 @@ void MDLog::create(Context *c)
 
   logger->set(l_mdl_expos, journaler->get_expire_pos());
   logger->set(l_mdl_wrpos, journaler->get_write_pos());
+
+  submit_thread.create();
 }
 
 void MDLog::open(Context *c)
@@ -148,6 +150,8 @@ void MDLog::open(Context *c)
   recovery_thread.set_completion(c);
   recovery_thread.create();
   recovery_thread.detach();
+
+  submit_thread.create();
   // either append() or replay() will follow.
 }
 
@@ -266,6 +270,81 @@ void MDLog::submit_entry(LogEvent *le, Context *c)
   delete le;
 }
 
+void MDLog::_submit_thread()
+{
+  dout(10) << "_submit_thread start" << dendl;
+
+  submit_mutex.Lock();
+
+  while (!stopping) {
+    map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
+    if (it == pending_events.end()) {
+      submit_cond.Wait(submit_mutex);
+      continue;
+    }
+
+    if (it->second.empty()) {
+      pending_events.erase(it);
+      continue;
+    }
+
+    PendingEvent data = it->second.front();
+    it->second.pop_front();
+
+    submit_mutex.Unlock();
+
+    if (data.le) {
+      LogEvent *le = data.le;
+      LogSegment *ls = le->_segment;
+      // encode it, with event type
+      bufferlist bl;
+      le->encode_with_header(bl);
+
+      mds->mds_lock.Lock();
+
+      uint64_t write_pos = journaler->get_write_pos();
+
+      le->set_start_off(write_pos);
+      if (le->get_type() == EVENT_SUBTREEMAP)
+       ls->offset = write_pos;
+
+      dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
+             << " : " << *le << dendl;
+
+      // journal it.
+      journaler->append_entry(bl);  // bl is destroyed.
+      ls->end = journaler->get_write_pos();
+
+      if (data.fin)
+       journaler->wait_for_flush(data.fin);
+      if (data.flush)
+       journaler->flush();
+
+      mds->mds_lock.Unlock();
+
+      if (logger)
+       logger->set(l_mdl_wrpos, ls->end);
+
+      delete le;
+    } else {
+      mds->mds_lock.Lock();
+      if (data.fin)
+       journaler->wait_for_flush(data.fin);
+      if (data.flush)
+       journaler->flush();
+      mds->mds_lock.Unlock();
+    }
+
+    submit_mutex.Lock();
+    if (data.flush)
+      unflushed = 0;
+    else if (data.le)
+      unflushed++;
+  }
+
+  submit_mutex.Unlock();
+}
+
 void MDLog::wait_for_safe(Context *c)
 {
   if (g_conf->mds_log) {
@@ -290,6 +369,23 @@ void MDLog::cap()
   capped = true;
 }
 
+void MDLog::shutdown()
+{
+  dout(5) << "shutdown" << dendl;
+  if (!submit_thread.is_started())
+    return;
+
+  assert(mds->mds_lock.is_locked_by_me());
+  mds->mds_lock.Unlock();
+
+  submit_mutex.Lock();
+  stopping = true;
+  submit_cond.Signal();
+  submit_mutex.Unlock();
+
+  mds->mds_lock.Lock();
+}
+
 
 // -----------------------------
 // segments
index 318c6344e7dc00990e5b2c547692c7084f46aa3a..b64504600ca147b335f508567c181ba290df4649 100644 (file)
@@ -70,6 +70,8 @@ protected:
 
   bool capped;
 
+  bool stopping;
+
   inodeno_t ino;
   Journaler *journaler;
 
@@ -121,6 +123,29 @@ protected:
   int expiring_events;
   int expired_events;
 
+  struct PendingEvent {
+    LogEvent *le;
+    Context *fin;
+    bool flush;
+    PendingEvent(LogEvent *e, Context *c, bool f=false) : le(e), fin(c), flush(f) {}
+  };
+
+  map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
+  Mutex submit_mutex;
+  Cond submit_cond;
+
+  void _submit_thread();
+  class SubmitThread : public Thread {
+    MDLog *log;
+  public:
+    SubmitThread(MDLog *l) : log(l) {}
+    void* entry() {
+      log->_submit_thread();
+      return 0;
+    }
+  } submit_thread;
+  friend class SubmitThread;
+
   // -- subtreemaps --
   friend class ESubtreeMap;
   friend class C_MDS_WroteImportMap;
@@ -168,12 +193,15 @@ public:
                  num_events(0), 
                  unflushed(0),
                  capped(false),
+                 stopping(false),
                  journaler(0),
                  logger(0),
                  replay_thread(this),
                  already_replayed(false),
                  recovery_thread(this),
                  event_seq(0), expiring_events(0), expired_events(0),
+                 submit_mutex("MDLog::submit_mutex"),
+                 submit_thread(this),
                  cur_event(NULL) { }             
   ~MDLog();
 
@@ -216,6 +244,8 @@ public:
   bool is_capped() { return capped; }
   void cap();
 
+  void shutdown();
+
   // -- events --
 private:
   LogEvent *cur_event;
index e97deb0f8248d245c4d063f485b6ed9b3aff325b..7aca856f4ce0976a146be1405c85246e058781f1 100644 (file)
@@ -1753,6 +1753,8 @@ void MDS::suicide()
   dout(1) << "suicide.  wanted " << ceph_mds_state_name(want_state)
          << ", now " << ceph_mds_state_name(state) << dendl;
 
+  mdlog->shutdown();
+
   // stop timers
   if (beacon_sender) {
     timer.cancel_event(beacon_sender);