From 167431dbbf693c2d3a13bac8423c7594aeea9031 Mon Sep 17 00:00:00 2001 From: sageweil Date: Sun, 9 Sep 2007 01:35:27 +0000 Subject: [PATCH] segment-based trimming; balancer does queued exports git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1788 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 16 ++++----- trunk/ceph/include/buffer.h | 1 + trunk/ceph/mds/MDBalancer.cc | 8 ++--- trunk/ceph/mds/MDCache.cc | 6 ++-- trunk/ceph/mds/MDLog.cc | 66 ++++++++++++++++++++++++++++++------ trunk/ceph/mds/MDLog.h | 50 +++++++++++++++------------ trunk/ceph/mds/Migrator.cc | 35 ++++++++++++++++++- trunk/ceph/mds/Migrator.h | 4 +++ trunk/ceph/mds/journal.cc | 56 ++++++++++++++++++++++++------ trunk/ceph/osd/OSDMap.h | 8 ++--- 10 files changed, 187 insertions(+), 63 deletions(-) diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index dccaa2e16c7a0..8b4f83925b524 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -13,10 +13,10 @@ # on issdm, it's /usr/local/mpich2/bin. # Hook for extra -I options, etc. -EXTRA_CFLAGS = #-I${HOME}/include -L${HOME}/lib +EXTRA_CFLAGS = -pg -g #-I${HOME}/include -L${HOME}/lib # base -CFLAGS = -O3 -pg -g -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE ${EXTRA_CFLAGS} +CFLAGS = -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE ${EXTRA_CFLAGS} LDINC = ld -i -o CC = g++ LIBS = -pthread @@ -186,26 +186,26 @@ ipc_testclient: ceph_ipc/ipc_testclient.cc ceph_ipc/ipc_client.o # fake* fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o client/fuse_ll.o msg/FakeMessenger.o common.o - ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@ + ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o - ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ + ${CC} ${CFLAGS} ${LIBS} $^ -o $@ # mpi startup newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o - ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@ + ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ # ebofs mkfs.ebofs: ebofs/mkfs.ebofs.cc config.cc common/Clock.o ebofs.o - ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ + ${CC} ${CFLAGS} ${LIBS} $^ -o $@ test.ebofs: ebofs/test.ebofs.cc config.cc common/Clock.o ebofs.o - ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ + ${CC} ${CFLAGS} ${LIBS} $^ -o $@ dupstore: dupstore.cc config.cc ebofs.o common/Clock.o common/Timer.o osd/FakeStore.o - ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ + ${CC} ${CFLAGS} ${LIBS} $^ -o $@ # hadoop diff --git a/trunk/ceph/include/buffer.h b/trunk/ceph/include/buffer.h index 088d3fff560d8..3f3650f4162e2 100644 --- a/trunk/ceph/include/buffer.h +++ b/trunk/ceph/include/buffer.h @@ -471,6 +471,7 @@ public: iterator& operator++() { assert(p != ls.end()); advance(1); + return *this; } // copy data out. diff --git a/trunk/ceph/mds/MDBalancer.cc b/trunk/ceph/mds/MDBalancer.cc index 72633f242aa65..3619c8fa47f3d 100644 --- a/trunk/ceph/mds/MDBalancer.cc +++ b/trunk/ceph/mds/MDBalancer.cc @@ -490,7 +490,7 @@ void MDBalancer::do_rebalance(int beat) dout(-5) << " exporting idle (" << pop << ") import " << *im << " back to mds" << im->inode->authority().first << dendl; - mds->mdcache->migrator->export_dir(im, im->inode->authority().first); + mds->mdcache->migrator->export_dir_nicely(im, im->inode->authority().first); continue; } @@ -556,7 +556,7 @@ void MDBalancer::do_rebalance(int beat) dout(-5) << "reexporting " << *dir << " pop " << pop << " back to mds" << target << dendl; - mds->mdcache->migrator->export_dir(dir, target); + mds->mdcache->migrator->export_dir_nicely(dir, target); have += pop; import_from_map.erase(plast); import_pop_map.erase(pop); @@ -586,7 +586,7 @@ void MDBalancer::do_rebalance(int beat) << " back to mds" << imp->inode->authority() << dendl; have += pop; - mds->mdcache->migrator->export_dir(imp, imp->inode->authority().first); + mds->mdcache->migrator->export_dir_nicely(imp, imp->inode->authority().first); } if (amount-have < MIN_OFFLOAD) break; } @@ -621,7 +621,7 @@ void MDBalancer::do_rebalance(int beat) << " to mds" << target << " " << **it << dendl; - mds->mdcache->migrator->export_dir(*it, target); + mds->mdcache->migrator->export_dir_nicely(*it, target); } } diff --git a/trunk/ceph/mds/MDCache.cc b/trunk/ceph/mds/MDCache.cc index a4161b718a537..dc9ba7506c9e9 100644 --- a/trunk/ceph/mds/MDCache.cc +++ b/trunk/ceph/mds/MDCache.cc @@ -1039,12 +1039,10 @@ void MDCache::log_subtree_map(Context *onsync) void MDCache::_logged_subtree_map(off_t off) { dout(10) << "_logged_subtree_map at " << off << dendl; - mds->mdlog->last_subtree_map = off; + mds->mdlog->subtree_maps.insert(off); mds->mdlog->writing_subtree_map = false; - list ls; - mds->mdlog->take_subtree_map_expire_waiters(ls); - mds->queue_waiters(ls); + mds->mdlog->kick_subtree_map(); // just in case the last segment was empty. } diff --git a/trunk/ceph/mds/MDLog.cc b/trunk/ceph/mds/MDLog.cc index 21385d237a5c7..a07ad0a6649ea 100644 --- a/trunk/ceph/mds/MDLog.cc +++ b/trunk/ceph/mds/MDLog.cc @@ -146,12 +146,12 @@ void MDLog::submit_entry( LogEvent *le, Context *c ) // should we log a new import_map? // FIXME: should this go elsewhere? - if (last_subtree_map && !writing_subtree_map && + if (!writing_subtree_map && (journaler->get_write_pos() / log_inode.layout.period()) != - (last_subtree_map / log_inode.layout.period()) && - (journaler->get_write_pos() - last_subtree_map > log_inode.layout.period()/2)) { + (get_last_subtree_map_offset() / log_inode.layout.period()) && + (journaler->get_write_pos() - get_last_subtree_map_offset() > log_inode.layout.period()/2)) { // log import map - dout(10) << "submit_entry also logging subtree map: last = " << last_subtree_map + dout(10) << "submit_entry also logging subtree map: last = " << get_last_subtree_map_offset() << ", cur pos = " << journaler->get_write_pos() << dendl; mds->mdcache->log_subtree_map(); } @@ -187,7 +187,12 @@ void MDLog::flush() trim(NULL); } - +void MDLog::cap() +{ + dout(5) << "cap" << dendl; + capped = true; + kick_subtree_map(); +} // trim @@ -236,16 +241,55 @@ void MDLog::_trimmed(LogEvent *le) dout(7) << "trimmed : " << le->get_start_off() << " : " << *le << dendl; - if (trimming.begin()->first == le->_end_off) { - // we trimmed off the front! + bool kick = false; + + map::iterator p = trimming.begin(); + if (p->first == le->_start_off) { + // we trimmed off the front! it must have been a segment head. + assert(!subtree_maps.empty()); + assert(p->first == *subtree_maps.begin()); + subtree_maps.erase(subtree_maps.begin()); + // we can expire the log a bit. - journaler->set_expire_pos(le->_end_off); + off_t to = get_trimmed_to(); + journaler->set_expire_pos(to); journaler->trim(); + + kick = true; + } else { + p++; + + // is the next one us? + if (le->_start_off == p->first) { + p++; + + // did we empty a segment? + if (subtree_maps.size() >= 2) { + set::iterator segp = subtree_maps.begin(); + assert(*segp < le->_end_off); + segp++; + dout(20) << "i ended at " << le->get_end_off() + << ", next seg starts at " << *segp + << ", next trimming is " << (p == trimming.end() ? 0:p->first) + << dendl; + if (*segp >= le->_end_off && + (p == trimming.end() || + p->first >= *segp)) { + dout(10) << "_trimmed segment looks empty" << dendl; + kick = true; + } + } else if (capped && trimming.size() < 3) { + kick = true; // blech, imprecise + } + } } - trimming.erase(le->_end_off); + trimming.erase(le->_start_off); delete le; - + + if (kick) + kick_subtree_map(); + logger->inc("trimf"); logger->set("trimng", trimming.size()); logger->set("rdpos", journaler->get_read_pos()); @@ -305,7 +349,7 @@ void MDLog::trim(Context *c) // trim! dout(7) << "trim expiring : " << le->get_start_off() << " : " << *le << dendl; - trimming[le->_end_off] = le; + trimming[le->_start_off] = le; le->expire(mds, new C_MDL_Trimmed(this, le)); logger->inc("trims"); logger->set("trimng", trimming.size()); diff --git a/trunk/ceph/mds/MDLog.h b/trunk/ceph/mds/MDLog.h index a2dd4c02a89d3..8fc45b35529c7 100644 --- a/trunk/ceph/mds/MDLog.h +++ b/trunk/ceph/mds/MDLog.h @@ -67,6 +67,13 @@ class MDLog { bool waiting_for_read; friend class C_MDL_Reading; + + off_t get_trimmed_to() { + if (trimming.empty()) + return get_read_pos(); + else + return trimming.begin()->first; + } // -- replay -- @@ -93,35 +100,42 @@ class MDLog { // -- subtreemaps -- - off_t last_subtree_map; // offsets of last committed subtreemap. constrains trimming. - list subtree_map_expire_waiters; + set subtree_maps; + map > subtree_map_expire_waiters; bool writing_subtree_map; // one is being written now bool seen_subtree_map; // for recovery + friend class ESubtreeMap; friend class C_MDS_WroteImportMap; friend class MDCache; - void init_journaler(); - - public: - void reopen_logger(utime_t start); - - off_t get_last_subtree_map_offset() { return last_subtree_map; } - void add_subtree_map_expire_waiter(Context *c) { - subtree_map_expire_waiters.push_back(c); - } - void take_subtree_map_expire_waiters(list& ls) { - ls.splice(ls.end(), subtree_map_expire_waiters); + void kick_subtree_map() { + if (subtree_map_expire_waiters.empty()) return; + list ls; + ls.swap(subtree_map_expire_waiters.begin()->second); + subtree_map_expire_waiters.erase(subtree_map_expire_waiters.begin()); + finish_contexts(ls); } +public: + off_t get_last_subtree_map_offset() { + assert(!subtree_maps.empty()); + return *subtree_maps.rbegin(); + } +private: + void init_journaler(); + +public: + void reopen_logger(utime_t start); + // replay state map > pending_exports; - public: +public: MDLog(MDS *m) : mds(m), num_events(0), max_events(g_conf.mds_log_max_len), unflushed(0), @@ -130,7 +144,6 @@ class MDLog { logger(0), trim_reading(false), waiting_for_read(false), replay_thread(this), - last_subtree_map(0), writing_subtree_map(false), seen_subtree_map(false) { } ~MDLog(); @@ -148,12 +161,7 @@ class MDLog { } bool is_capped() { return capped; } - void cap() { - capped = true; - list ls; - ls.swap(subtree_map_expire_waiters); - finish_contexts(ls); - } + void cap(); void submit_entry( LogEvent *e, Context *c = 0 ); void wait_for_sync( Context *c ); diff --git a/trunk/ceph/mds/Migrator.cc b/trunk/ceph/mds/Migrator.cc index d84caa51f02c4..bc29c479ef65e 100644 --- a/trunk/ceph/mds/Migrator.cc +++ b/trunk/ceph/mds/Migrator.cc @@ -260,8 +260,10 @@ void Migrator::handle_mds_failure_or_stop(int who) // send pending import_maps? (these need to go out when all exports have finished.) cache->maybe_send_pending_resolves(); - + cache->show_subtrees(); + + maybe_do_queued_export(); } } else { // bystander failed. @@ -486,6 +488,35 @@ void Migrator::audit() // ========================================================== // EXPORT +void Migrator::export_dir_nicely(CDir *dir, int dest) +{ + // enqueue + dout(7) << "export_dir_nicely " << *dir << " to " << dest << dendl; + export_queue.push_back(pair(dir->dirfrag(), dest)); + + maybe_do_queued_export(); +} + +void Migrator::maybe_do_queued_export() +{ + while (!export_queue.empty() && + export_state.size() <= 2) { + dirfrag_t df = export_queue.front().first; + int dest = export_queue.front().second; + export_queue.pop_front(); + + CDir *dir = mds->mdcache->get_dirfrag(df); + if (!dir) continue; + if (!dir->is_auth()) continue; + + dout(-7) << "nicely exporting " << *dir << " to " << dest << dendl; + + export_dir(dir, dest); + } +} + + + class C_MDC_ExportFreeze : public Context { Migrator *mig; @@ -1240,6 +1271,8 @@ void Migrator::export_finish(CDir *dir) // send pending import_maps? mds->mdcache->maybe_send_pending_resolves(); + + maybe_do_queued_export(); } diff --git a/trunk/ceph/mds/Migrator.h b/trunk/ceph/mds/Migrator.h index 421859bea3974..5af7a42ce38f6 100644 --- a/trunk/ceph/mds/Migrator.h +++ b/trunk/ceph/mds/Migrator.h @@ -84,6 +84,7 @@ protected: map > export_finish_waiters; + list< pair > export_queue; // -- imports -- public: @@ -179,6 +180,9 @@ public: void export_dir(CDir *dir, int dest); void export_empty_import(CDir *dir); + void export_dir_nicely(CDir *dir, int dest); + void maybe_do_queued_export(); + void encode_export_inode(CInode *in, bufferlist& enc_state, int newauth, map& exported_client_map, utime_t now); diff --git a/trunk/ceph/mds/journal.cc b/trunk/ceph/mds/journal.cc index 1af4feae08c94..fa41f246ceef2 100644 --- a/trunk/ceph/mds/journal.cc +++ b/trunk/ceph/mds/journal.cc @@ -861,28 +861,64 @@ void ESlaveUpdate::replay(MDS *mds) bool ESubtreeMap::has_expired(MDS *mds) { - if (mds->mdlog->get_last_subtree_map_offset() > get_start_off()) { - dout(10) << "ESubtreeMap.has_expired -- there's a newer map" << dendl; - return true; - } else if (mds->mdlog->is_capped()) { - dout(10) << "ESubtreeMap.has_expired -- log is capped, allowing map to expire" << dendl; + assert(!mds->mdlog->subtree_maps.empty()); + set::iterator p = mds->mdlog->subtree_maps.begin(); + off_t first = *p; + if (get_start_off() != first) { + dout(10) << "ESubtreeMap.has_expired -- not the oldest segment" << dendl; + return false; + } + + // i am the oldest. + + // capped and last event? + if (mds->mdlog->is_capped() && + mds->mdlog->subtree_maps.size() == 1 && + (mds->mdlog->trimming.empty() || + (mds->mdlog->trimming.size() == 1 && + mds->mdlog->trimming.begin()->second == this))) { + dout(10) << "ESubtreeMap.has_expired -- capped and last one" << dendl; return true; - } else { - dout(10) << "ESubtreeMap.has_expired -- not until there's a newer map written" - << " (" << get_start_off() << " >= " << mds->mdlog->get_last_subtree_map_offset() << ")" - << dendl; + } + + p++; + if (p == mds->mdlog->subtree_maps.end()) { + dout(10) << "ESubtreeMap.has_expired -- only segment" << dendl; return false; } + off_t next = *p; + + if (mds->mdlog->get_read_pos() < next) { + dout(10) << "ESubtreeMap.has_expired -- haven't read this segment, read pos " + << mds->mdlog->get_read_pos() << " < next map at " << next + << dendl; + return false; + } + + map::iterator trimp = mds->mdlog->trimming.begin(); + assert(trimp->first == get_start_off()); + trimp++; + if (trimp != mds->mdlog->trimming.end() && + trimp->first < next) { + dout(10) << "ESubtreeMap.has_expired -- segment still trimming at " << trimp->first << dendl; + return false; + } + + dout(10) << "ESubtreeMap.has_expired -- segment is empty" << dendl; + return true; } void ESubtreeMap::expire(MDS *mds, Context *c) { dout(10) << "ESubtreeMap.has_expire -- waiting for a newer map to be written (or for shutdown)" << dendl; - mds->mdlog->add_subtree_map_expire_waiter(c); + mds->mdlog->subtree_map_expire_waiters[get_start_off()].push_back(c); } void ESubtreeMap::replay(MDS *mds) { + // note location + mds->mdlog->subtree_maps.insert(get_start_off()); + if (mds->mdcache->is_subtrees()) { dout(10) << "ESubtreeMap.replay -- ignoring, already have import map" << dendl; } else { diff --git a/trunk/ceph/osd/OSDMap.h b/trunk/ceph/osd/OSDMap.h index 59e108c937aaf..144c72c0f59d8 100644 --- a/trunk/ceph/osd/OSDMap.h +++ b/trunk/ceph/osd/OSDMap.h @@ -175,10 +175,10 @@ private: int num_osds() { return osds.size(); } void get_all_osds(set& ls) { ls = osds; } - const set& get_osds() { return osds; } - const map& get_down_osds() { return down_osds; } - const set& get_out_osds() { return out_osds; } - const map& get_overload_osds() { return overload_osds; } + const set& get_osds() { return osds; } + const map& get_down_osds() { return down_osds; } + const set& get_out_osds() { return out_osds; } + const map& get_overload_osds() { return overload_osds; } bool exists(int osd) { return osds.count(osd); } bool is_down(int osd) { return down_osds.count(osd); } -- 2.39.5