From: Yan, Zheng Date: Wed, 25 Jun 2014 07:09:33 +0000 (+0800) Subject: mds: add thread to encode/submit log events X-Git-Tag: v0.84~138^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6d8ccdd656ba247c94216e86ba5aad400f108dbe;p=ceph.git mds: add thread to encode/submit log events Signed-off-by: Yan, Zheng --- diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index f98317a277c..2f2f2628021 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -139,6 +139,8 @@ void MDLog::create(Context *c) logger->set(l_mdl_expos, journaler->get_expire_pos()); logger->set(l_mdl_wrpos, journaler->get_write_pos()); + + submit_thread.create(); } void MDLog::open(Context *c) @@ -148,6 +150,8 @@ void MDLog::open(Context *c) recovery_thread.set_completion(c); recovery_thread.create(); recovery_thread.detach(); + + submit_thread.create(); // either append() or replay() will follow. } @@ -266,6 +270,81 @@ void MDLog::submit_entry(LogEvent *le, Context *c) delete le; } +void MDLog::_submit_thread() +{ + dout(10) << "_submit_thread start" << dendl; + + submit_mutex.Lock(); + + while (!stopping) { + map >::iterator it = pending_events.begin(); + if (it == pending_events.end()) { + submit_cond.Wait(submit_mutex); + continue; + } + + if (it->second.empty()) { + pending_events.erase(it); + continue; + } + + PendingEvent data = it->second.front(); + it->second.pop_front(); + + submit_mutex.Unlock(); + + if (data.le) { + LogEvent *le = data.le; + LogSegment *ls = le->_segment; + // encode it, with event type + bufferlist bl; + le->encode_with_header(bl); + + mds->mds_lock.Lock(); + + uint64_t write_pos = journaler->get_write_pos(); + + le->set_start_off(write_pos); + if (le->get_type() == EVENT_SUBTREEMAP) + ls->offset = write_pos; + + dout(5) << "_submit_thread " << write_pos << "~" << bl.length() + << " : " << *le << dendl; + + // journal it. + journaler->append_entry(bl); // bl is destroyed. + ls->end = journaler->get_write_pos(); + + if (data.fin) + journaler->wait_for_flush(data.fin); + if (data.flush) + journaler->flush(); + + mds->mds_lock.Unlock(); + + if (logger) + logger->set(l_mdl_wrpos, ls->end); + + delete le; + } else { + mds->mds_lock.Lock(); + if (data.fin) + journaler->wait_for_flush(data.fin); + if (data.flush) + journaler->flush(); + mds->mds_lock.Unlock(); + } + + submit_mutex.Lock(); + if (data.flush) + unflushed = 0; + else if (data.le) + unflushed++; + } + + submit_mutex.Unlock(); +} + void MDLog::wait_for_safe(Context *c) { if (g_conf->mds_log) { @@ -290,6 +369,23 @@ void MDLog::cap() capped = true; } +void MDLog::shutdown() +{ + dout(5) << "shutdown" << dendl; + if (!submit_thread.is_started()) + return; + + assert(mds->mds_lock.is_locked_by_me()); + mds->mds_lock.Unlock(); + + submit_mutex.Lock(); + stopping = true; + submit_cond.Signal(); + submit_mutex.Unlock(); + + mds->mds_lock.Lock(); +} + // ----------------------------- // segments diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 318c6344e7d..b64504600ca 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -70,6 +70,8 @@ protected: bool capped; + bool stopping; + inodeno_t ino; Journaler *journaler; @@ -121,6 +123,29 @@ protected: int expiring_events; int expired_events; + struct PendingEvent { + LogEvent *le; + Context *fin; + bool flush; + PendingEvent(LogEvent *e, Context *c, bool f=false) : le(e), fin(c), flush(f) {} + }; + + map > pending_events; // log segment -> event list + Mutex submit_mutex; + Cond submit_cond; + + void _submit_thread(); + class SubmitThread : public Thread { + MDLog *log; + public: + SubmitThread(MDLog *l) : log(l) {} + void* entry() { + log->_submit_thread(); + return 0; + } + } submit_thread; + friend class SubmitThread; + // -- subtreemaps -- friend class ESubtreeMap; friend class C_MDS_WroteImportMap; @@ -168,12 +193,15 @@ public: num_events(0), unflushed(0), capped(false), + stopping(false), journaler(0), logger(0), replay_thread(this), already_replayed(false), recovery_thread(this), event_seq(0), expiring_events(0), expired_events(0), + submit_mutex("MDLog::submit_mutex"), + submit_thread(this), cur_event(NULL) { } ~MDLog(); @@ -216,6 +244,8 @@ public: bool is_capped() { return capped; } void cap(); + void shutdown(); + // -- events -- private: LogEvent *cur_event; diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index e97deb0f824..7aca856f4ce 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -1753,6 +1753,8 @@ void MDS::suicide() dout(1) << "suicide. wanted " << ceph_mds_state_name(want_state) << ", now " << ceph_mds_state_name(state) << dendl; + mdlog->shutdown(); + // stop timers if (beacon_sender) { timer.cancel_event(beacon_sender);