From: Yan, Zheng Date: Fri, 27 Jun 2014 07:12:26 +0000 (+0800) Subject: mds: defer encoding/submitting log events to separate thread X-Git-Tag: v0.84~138^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7e1deb6ea9ca71f177bf958420b567da43a973c7;p=ceph.git mds: defer encoding/submitting log events to separate thread Signed-off-by: Yan, Zheng --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 1c9dbacc71d0..ecb46d45cc3b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -313,6 +313,7 @@ OPTION(mds_default_dir_hash, OPT_INT, CEPH_STR_HASH_RJENKINS) OPTION(mds_log, OPT_BOOL, true) OPTION(mds_log_skip_corrupt_events, OPT_BOOL, false) OPTION(mds_log_max_events, OPT_INT, -1) +OPTION(mds_log_events_per_segment, OPT_INT, 1024) OPTION(mds_log_segment_size, OPT_INT, 0) // segment size for mds log, // defaults to g_default_file_layout.fl_object_size (4MB) OPTION(mds_log_max_segments, OPT_INT, 30) diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 738fe656d649..194f5001f404 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -213,54 +213,35 @@ void MDLog::_submit_entry(LogEvent *le, Context *c) // let the event register itself in the segment assert(!segments.empty()); - le->_segment = segments.rbegin()->second; - le->_segment->num_events++; - le->update_segment(); + LogSegment *ls = segments.rbegin()->second; + ls->num_events++; + le->_segment = ls; + le->update_segment(); le->set_stamp(ceph_clock_now(g_ceph_context)); - - num_events++; - assert(!capped); - - // encode it, with event type - { - bufferlist bl; - le->encode_with_header(bl); - - dout(5) << "submit_entry " << journaler->get_write_pos() << "~" << bl.length() - << " : " << *le << dendl; - - // journal it. - journaler->append_entry(bl); // bl is destroyed. - } - le->_segment->end = journaler->get_write_pos(); + pending_events[ls->seq].push_back(PendingEvent(le, c)); + num_events++; if (logger) { logger->inc(l_mdl_evadd); logger->set(l_mdl_ev, num_events); - logger->set(l_mdl_wrpos, journaler->get_write_pos()); } unflushed++; - - if (c) - journaler->wait_for_flush(c); - // start a new segment? - // FIXME: should this go elsewhere? - uint64_t last_seg = get_last_segment_offset(); uint64_t period = journaler->get_layout_period(); - // start a new segment if there are none or if we reach end of last segment + // start a new segment? if (le->get_type() == EVENT_SUBTREEMAP || (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) { // avoid infinite loop when ESubtreeMap is very large. // do not insert ESubtreeMap among EImportFinish events that finish // disambiguate imports. Because the ESubtreeMap reflects the subtree // state when all EImportFinish events are replayed. - } else if (journaler->get_write_pos()/period != last_seg/period) { - dout(10) << "submit_entry also starting new segment: last = " << last_seg - << ", cur pos = " << journaler->get_write_pos() << dendl; + } else if (ls->end/period != ls->offset/period || + ls->num_events >= g_conf->mds_log_events_per_segment) { + dout(10) << "submit_entry also starting new segment: last = " + << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl; _start_new_segment(); } else if (g_conf->mds_debug_subtrees && le->get_type() != EVENT_SUBTREEMAP_TEST) { @@ -271,8 +252,6 @@ void MDLog::_submit_entry(LogEvent *le, Context *c) sle->set_type(EVENT_SUBTREEMAP_TEST); _submit_entry(sle, NULL); } - - delete le; } void MDLog::_submit_thread() @@ -352,20 +331,43 @@ void MDLog::_submit_thread() void MDLog::wait_for_safe(Context *c) { - if (g_conf->mds_log) { - // wait - journaler->wait_for_flush(c); - } else { + if (!g_conf->mds_log) { // hack: bypass. c->complete(0); + return; + } + + submit_mutex.Lock(); + + bool no_pending = true; + if (!pending_events.empty()) { + pending_events.rbegin()->second.push_back(PendingEvent(NULL, c)); + no_pending = false; + submit_cond.Signal(); } + + submit_mutex.Unlock(); + + if (no_pending && c) + journaler->wait_for_flush(c); } void MDLog::flush() { - if (unflushed) - journaler->flush(); + submit_mutex.Lock(); + + bool do_flush = unflushed > 0; unflushed = 0; + if (!pending_events.empty()) { + pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true)); + do_flush = false; + submit_cond.Signal(); + } + + submit_mutex.Unlock(); + + if (do_flush) + journaler->flush(); } void MDLog::cap() @@ -408,7 +410,7 @@ void MDLog::_prepare_new_segment() uint64_t seq = event_seq + 1; dout(7) << __func__ << " seq " << seq << dendl; - segments[seq] = new LogSegment(seq, journaler->get_write_pos()); + segments[seq] = new LogSegment(seq); logger->inc(l_mdl_segadd); logger->set(l_mdl_seg, segments.size()); diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index ac288d20738a..a4d8d2537ce6 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -151,15 +151,10 @@ protected: friend class C_MDS_WroteImportMap; friend class MDCache; -public: uint64_t get_last_segment_seq() { assert(!segments.empty()); return segments.rbegin()->first; } - uint64_t get_last_segment_offset() { - assert(!segments.empty()); - return segments.rbegin()->first; - } LogSegment *get_oldest_segment() { return segments.begin()->second; } @@ -275,11 +270,13 @@ public: void submit_entry(LogEvent *e, Context *c = 0) { Mutex::Locker l(submit_mutex); _submit_entry(e, c); + submit_cond.Signal(); } void start_submit_entry(LogEvent *e, Context *c = 0) { Mutex::Locker l(submit_mutex); _start_entry(e); _submit_entry(e, c); + submit_cond.Signal(); } bool entry_is_open() { return cur_event != NULL; }