From 77ecf6a1246736e981add0fbf815a68baf21a3d7 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 24 Oct 2006 19:52:34 +0000 Subject: [PATCH] idallocator recovery replay working! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@945 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/mds/IdAllocator.cc | 47 +++++++++++++------- ceph/mds/IdAllocator.h | 14 +++--- ceph/mds/LogEvent.h | 12 +++++- ceph/mds/MDLog.cc | 93 +++++++++++++++++++++++++++++++++++++--- ceph/mds/MDLog.h | 6 +++ ceph/mds/MDS.cc | 18 ++++---- ceph/mds/events/EAlloc.h | 22 +++++++--- 7 files changed, 171 insertions(+), 41 deletions(-) diff --git a/ceph/mds/IdAllocator.cc b/ceph/mds/IdAllocator.cc index cd3821cb1291d..7399b4792de79 100644 --- a/ceph/mds/IdAllocator.cc +++ b/ceph/mds/IdAllocator.cc @@ -29,7 +29,7 @@ #define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".idalloc: " -idno_t IdAllocator::alloc_id() +idno_t IdAllocator::alloc_id(bool replay) { assert(is_active()); @@ -41,12 +41,13 @@ idno_t IdAllocator::alloc_id() version++; // log it - mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version)); + if (!replay) + mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version)); return id; } -void IdAllocator::reclaim_id(idno_t id) +void IdAllocator::reclaim_id(idno_t id, bool replay) { assert(is_active()); @@ -55,7 +56,8 @@ void IdAllocator::reclaim_id(idno_t id) version++; - mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version)); + if (!replay) + mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version)); } @@ -63,41 +65,54 @@ void IdAllocator::reclaim_id(idno_t id) class C_ID_Save : public Context { IdAllocator *ida; version_t version; - Context *onfinish; public: - C_ID_Save(IdAllocator *i, version_t v, Context *c) : ida(i), version(v), onfinish(c) {} + C_ID_Save(IdAllocator *i, version_t v) : ida(i), version(v) {} void finish(int r) { - ida->save_2(version, onfinish); + ida->save_2(version); } }; -void IdAllocator::save(Context *onfinish) +void IdAllocator::save(Context *onfinish, version_t v) { + if (v > 0 && v <= committing_version) { + dout(10) << "save v " << version << " - already saving " + << committing_version << " >= needed " << v << endl; + waitfor_save[v].push_back(onfinish); + return; + } + dout(10) << "save v " << version << endl; assert(is_active()); - + bufferlist bl; bl.append((char*)&version, sizeof(version)); ::_encode(free.m, bl); + committing_version = version; + + waitfor_save[version].push_back(onfinish); + // write (async) mds->filer->write(inode, 0, bl.length(), bl, 0, - 0, new C_ID_Save(this, version, onfinish)); + 0, new C_ID_Save(this, version)); } -void IdAllocator::save_2(version_t v, Context *onfinish) +void IdAllocator::save_2(version_t v) { dout(10) << "save_2 v " << v << endl; - + committed_version = v; - - if (onfinish) { - onfinish->finish(0); - delete onfinish; + + list ls; + while (!waitfor_save.empty()) { + if (waitfor_save.begin()->first > v) break; + ls.splice(ls.end(), waitfor_save.begin()->second); + waitfor_save.erase(waitfor_save.begin()); } + finish_contexts(ls,0); } diff --git a/ceph/mds/IdAllocator.h b/ceph/mds/IdAllocator.h index c04d7e60ae1a8..745d863be99d3 100644 --- a/ceph/mds/IdAllocator.h +++ b/ceph/mds/IdAllocator.h @@ -35,22 +35,24 @@ class IdAllocator { //static const int STATE_COMMITTING = 3; int state; - version_t version, committed_version; + version_t version, committing_version, committed_version; interval_set free; // unused ids + map > waitfor_save; + public: IdAllocator(MDS *m, inode_t i) : mds(m), inode(i), state(STATE_UNDEF), - version(0), committed_version(0) + version(0), committing_version(0), committed_version(0) { } // alloc or reclaim ids - idno_t alloc_id(); - void reclaim_id(idno_t id); + idno_t alloc_id(bool replay=false); + void reclaim_id(idno_t id, bool replay=false); version_t get_version() { return version; } version_t get_committed_version() { return committed_version; } @@ -61,8 +63,8 @@ class IdAllocator { bool is_opening() { return state == STATE_OPENING; } void reset(); - void save(Context *onfinish=0); - void save_2(version_t v, Context *onfinish); + void save(Context *onfinish=0, version_t need=0); + void save_2(version_t v); void shutdown() { if (is_active()) save(0); diff --git a/ceph/mds/LogEvent.h b/ceph/mds/LogEvent.h index d69f17111414a..b2f29735d8d6f 100644 --- a/ceph/mds/LogEvent.h +++ b/ceph/mds/LogEvent.h @@ -48,6 +48,11 @@ class LogEvent { static LogEvent *decode(bufferlist &bl); + virtual void print(ostream& out) { + out << "event(" << _type << ")"; + } + + /*** live journal ***/ /* obsolete() - is this entry committed to primary store, such that @@ -74,8 +79,13 @@ class LogEvent { /* replay() - replay given event */ - virtual void replay(MDS *m, Context *c) { assert(0); } + virtual void replay(MDS *m) { assert(0); } }; +inline ostream& operator<<(ostream& out, LogEvent& le) { + le.print(out); + return out; +} + #endif diff --git a/ceph/mds/MDLog.cc b/ceph/mds/MDLog.cc index 4b0e77396ffb5..e8061a98fe56c 100644 --- a/ceph/mds/MDLog.cc +++ b/ceph/mds/MDLog.cc @@ -100,8 +100,8 @@ void MDLog::write_head(Context *c) void MDLog::submit_entry( LogEvent *e, Context *c ) { - dout(5) << "submit_entry at " << num_events << endl; - + dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *e << endl; + if (g_conf.mds_log) { // encode it, with event type bufferlist bl; @@ -196,7 +196,7 @@ void MDLog::_did_read() void MDLog::_trimmed(LogEvent *le) { - dout(7) << " trimmed " << le << endl; + dout(7) << " trimmed " << *le << endl; assert(le->can_expire(mds)); @@ -246,14 +246,14 @@ void MDLog::trim(Context *c) // we just read an event. if (le->can_expire(mds) == true) { // obsolete - dout(7) << "trim obsolete " << le << endl; + dout(7) << "trim obsolete " << *le << endl; delete le; logger->inc("obs"); } else { assert ((int)trimming.size() < g_conf.mds_log_max_trimming); // trim! - dout(7) << "trim trimming " << le << endl; + dout(7) << "trim trimming " << *le << endl; trimming[le->_end_off] = le; le->retire(mds, new C_MDL_Trimmed(this, le)); logger->inc("retire"); @@ -286,3 +286,86 @@ void MDLog::trim(Context *c) } +void MDLog::replay(Context *c) +{ + assert(journaler->is_active()); + + // start reading at the last known expire point. + journaler->set_read_pos( journaler->get_expire_pos() ); + + // empty? + if (journaler->get_read_pos() == journaler->get_write_pos()) { + dout(10) << "replay - journal empty, done." << endl; + if (c) { + c->finish(0); + delete c; + } + return; + } + + // add waiter + if (c) + waitfor_replay.push_back(c); + + // go! + dout(10) << "replay start, from " << journaler->get_read_pos() + << " to " << journaler->get_write_pos() << endl; + + assert(num_events == 0); + + _replay(); +} + +class C_MDL_Replay : public Context { + MDLog *mdlog; +public: + C_MDL_Replay(MDLog *l) : mdlog(l) {} + void finish(int r) { mdlog->_replay(); } +}; + +void MDLog::_replay() +{ + // read what's buffered + while (journaler->is_readable() && + journaler->get_read_pos() < journaler->get_write_pos()) { + // read it + off_t pos = journaler->get_read_pos(); + bufferlist bl; + bool r = journaler->try_read_entry(bl); + assert(r); + + // unpack event + LogEvent *le = LogEvent::decode(bl); + num_events++; + + if (le->has_happened(mds)) { + dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() + << " : " << *le << " : already happened" << endl; + } else { + dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() + << " : " << *le << " : applying" << endl; + le->replay(mds); + } + delete le; + } + + // wait for read? + if (journaler->get_read_pos() < journaler->get_write_pos()) { + journaler->wait_for_readable(new C_MDL_Replay(this)); + return; + } + + // done! + assert(journaler->get_read_pos() == journaler->get_write_pos()); + dout(10) << "_replay - complete" << endl; + + // move read pointer _back_ to expire pos, for eventual trimming + journaler->set_read_pos(journaler->get_expire_pos()); + + // kick waiter(s) + list ls; + ls.swap(waitfor_replay); + finish_contexts(ls,0); +} + + diff --git a/ceph/mds/MDLog.h b/ceph/mds/MDLog.h index 7ff64f81d9045..37329a164e781 100644 --- a/ceph/mds/MDLog.h +++ b/ceph/mds/MDLog.h @@ -50,6 +50,7 @@ class MDLog { inode_t log_inode; Journaler *journaler; + //hash_map trimming; // events currently being trimmed map trimming; @@ -61,6 +62,8 @@ class MDLog { Logger *logger; + list waitfor_replay; + public: MDLog(MDS *m); ~MDLog(); @@ -80,6 +83,9 @@ class MDLog { void reset(); // fresh, empty log! void open(Context *onopen); void write_head(Context *onfinish); + + void replay(Context *onfinish); + void _replay(); }; #endif diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index bd0bf4d3ca5d0..6e7f50775e1b6 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -339,8 +339,8 @@ void MDS::boot_recover(int step) case 4: dout(2) << "boot_recover " << step << ": replaying mds log" << endl; - - //break; + mdlog->replay(new C_MDS_BootRecover(this, 5)); + break; case 5: dout(2) << "boot_recover " << step << ": done." << endl; @@ -493,11 +493,13 @@ void MDS::my_dispatch(Message *m) */ - // flush log to disk after every op. for now. - mdlog->flush(); + if (is_active()) { + // flush log to disk after every op. for now. + mdlog->flush(); - // trim cache - mdcache->trim(); + // trim cache + mdcache->trim(); + } // finish any triggered contexts if (finished_queue.size()) { @@ -508,7 +510,6 @@ void MDS::my_dispatch(Message *m) finish_contexts(ls); } - // hash root? @@ -525,7 +526,8 @@ void MDS::my_dispatch(Message *m) // periodic crap (1-second resolution) static utime_t last_log = g_clock.recent_now(); utime_t now = g_clock.recent_now(); - if (last_log.sec() != now.sec()) { + if (is_active() && + last_log.sec() != now.sec()) { // log last_log = now; diff --git a/ceph/mds/events/EAlloc.h b/ceph/mds/events/EAlloc.h index 5cdfa4355cf99..9959e5704bfbe 100644 --- a/ceph/mds/events/EAlloc.h +++ b/ceph/mds/events/EAlloc.h @@ -46,6 +46,14 @@ class EAlloc : public LogEvent { } + void print(ostream& out) { + if (what == EALLOC_EV_ALLOC) + out << "alloc " << hex << id << dec << " tablev " << table_version; + else + out << "dealloc " << hex << id << dec << " tablev " << table_version; + } + + // live journal bool can_expire(MDS *mds) { if (mds->idalloc->get_committed_version() <= table_version) @@ -55,24 +63,28 @@ class EAlloc : public LogEvent { } void retire(MDS *mds, Context *c) { - mds->idalloc->save(c); + mds->idalloc->save(c, table_version); } // recovery bool has_happened(MDS *mds) { - return (mds->idalloc->get_version() >= table_version); + if (mds->idalloc->get_version() >= table_version) { + cout << " event " << table_version << " <= table " << mds->idalloc->get_version() << endl; + return true; + } else + return false; } - void replay(MDS *mds, Context *c) { + void replay(MDS *mds) { assert(table_version-1 == mds->idalloc->get_version()); if (what == EALLOC_EV_ALLOC) { - idno_t nid = mds->idalloc->alloc_id(); + idno_t nid = mds->idalloc->alloc_id(true); assert(nid == id); // this should match. } else if (what == EALLOC_EV_FREE) { - mds->idalloc->reclaim_id(id); + mds->idalloc->reclaim_id(id, true); } else assert(0); -- 2.39.5