#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".idalloc: "
-idno_t IdAllocator::alloc_id()
+idno_t IdAllocator::alloc_id(bool replay)
{
assert(is_active());
version++;
// log it
- mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version));
+ if (!replay)
+ mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version));
return id;
}
-void IdAllocator::reclaim_id(idno_t id)
+void IdAllocator::reclaim_id(idno_t id, bool replay)
{
assert(is_active());
version++;
- mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version));
+ if (!replay)
+ mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version));
}
class C_ID_Save : public Context {
IdAllocator *ida;
version_t version;
- Context *onfinish;
public:
- C_ID_Save(IdAllocator *i, version_t v, Context *c) : ida(i), version(v), onfinish(c) {}
+ C_ID_Save(IdAllocator *i, version_t v) : ida(i), version(v) {}
void finish(int r) {
- ida->save_2(version, onfinish);
+ ida->save_2(version);
}
};
-void IdAllocator::save(Context *onfinish)
+void IdAllocator::save(Context *onfinish, version_t v)
{
+ if (v > 0 && v <= committing_version) {
+ dout(10) << "save v " << version << " - already saving "
+ << committing_version << " >= needed " << v << endl;
+ waitfor_save[v].push_back(onfinish);
+ return;
+ }
+
dout(10) << "save v " << version << endl;
assert(is_active());
-
+
bufferlist bl;
bl.append((char*)&version, sizeof(version));
::_encode(free.m, bl);
+ committing_version = version;
+
+ waitfor_save[version].push_back(onfinish);
+
// write (async)
mds->filer->write(inode,
0, bl.length(), bl,
0,
- 0, new C_ID_Save(this, version, onfinish));
+ 0, new C_ID_Save(this, version));
}
-void IdAllocator::save_2(version_t v, Context *onfinish)
+void IdAllocator::save_2(version_t v)
{
dout(10) << "save_2 v " << v << endl;
-
+
committed_version = v;
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
+
+ list<Context*> ls;
+ while (!waitfor_save.empty()) {
+ if (waitfor_save.begin()->first > v) break;
+ ls.splice(ls.end(), waitfor_save.begin()->second);
+ waitfor_save.erase(waitfor_save.begin());
}
+ finish_contexts(ls,0);
}
//static const int STATE_COMMITTING = 3;
int state;
- version_t version, committed_version;
+ version_t version, committing_version, committed_version;
interval_set<idno_t> free; // unused ids
+ map<version_t, list<Context*> > waitfor_save;
+
public:
IdAllocator(MDS *m, inode_t i) :
mds(m),
inode(i),
state(STATE_UNDEF),
- version(0), committed_version(0)
+ version(0), committing_version(0), committed_version(0)
{
}
// alloc or reclaim ids
- idno_t alloc_id();
- void reclaim_id(idno_t id);
+ idno_t alloc_id(bool replay=false);
+ void reclaim_id(idno_t id, bool replay=false);
version_t get_version() { return version; }
version_t get_committed_version() { return committed_version; }
bool is_opening() { return state == STATE_OPENING; }
void reset();
- void save(Context *onfinish=0);
- void save_2(version_t v, Context *onfinish);
+ void save(Context *onfinish=0, version_t need=0);
+ void save_2(version_t v);
void shutdown() {
if (is_active()) save(0);
static LogEvent *decode(bufferlist &bl);
+ virtual void print(ostream& out) {
+ out << "event(" << _type << ")";
+ }
+
+
/*** live journal ***/
/* obsolete() - is this entry committed to primary store, such that
/* replay() - replay given event
*/
- virtual void replay(MDS *m, Context *c) { assert(0); }
+ virtual void replay(MDS *m) { assert(0); }
};
+inline ostream& operator<<(ostream& out, LogEvent& le) {
+ le.print(out);
+ return out;
+}
+
#endif
void MDLog::submit_entry( LogEvent *e,
Context *c )
{
- dout(5) << "submit_entry at " << num_events << endl;
-
+ dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *e << endl;
+
if (g_conf.mds_log) {
// encode it, with event type
bufferlist bl;
void MDLog::_trimmed(LogEvent *le)
{
- dout(7) << " trimmed " << le << endl;
+ dout(7) << " trimmed " << *le << endl;
assert(le->can_expire(mds));
// we just read an event.
if (le->can_expire(mds) == true) {
// obsolete
- dout(7) << "trim obsolete " << le << endl;
+ dout(7) << "trim obsolete " << *le << endl;
delete le;
logger->inc("obs");
} else {
assert ((int)trimming.size() < g_conf.mds_log_max_trimming);
// trim!
- dout(7) << "trim trimming " << le << endl;
+ dout(7) << "trim trimming " << *le << endl;
trimming[le->_end_off] = le;
le->retire(mds, new C_MDL_Trimmed(this, le));
logger->inc("retire");
}
+void MDLog::replay(Context *c)
+{
+ assert(journaler->is_active());
+
+ // start reading at the last known expire point.
+ journaler->set_read_pos( journaler->get_expire_pos() );
+
+ // empty?
+ if (journaler->get_read_pos() == journaler->get_write_pos()) {
+ dout(10) << "replay - journal empty, done." << endl;
+ if (c) {
+ c->finish(0);
+ delete c;
+ }
+ return;
+ }
+
+ // add waiter
+ if (c)
+ waitfor_replay.push_back(c);
+
+ // go!
+ dout(10) << "replay start, from " << journaler->get_read_pos()
+ << " to " << journaler->get_write_pos() << endl;
+
+ assert(num_events == 0);
+
+ _replay();
+}
+
+class C_MDL_Replay : public Context {
+ MDLog *mdlog;
+public:
+ C_MDL_Replay(MDLog *l) : mdlog(l) {}
+ void finish(int r) { mdlog->_replay(); }
+};
+
+void MDLog::_replay()
+{
+ // read what's buffered
+ while (journaler->is_readable() &&
+ journaler->get_read_pos() < journaler->get_write_pos()) {
+ // read it
+ off_t pos = journaler->get_read_pos();
+ bufferlist bl;
+ bool r = journaler->try_read_entry(bl);
+ assert(r);
+
+ // unpack event
+ LogEvent *le = LogEvent::decode(bl);
+ num_events++;
+
+ if (le->has_happened(mds)) {
+ dout(10) << "_replay " << pos << " / " << journaler->get_write_pos()
+ << " : " << *le << " : already happened" << endl;
+ } else {
+ dout(10) << "_replay " << pos << " / " << journaler->get_write_pos()
+ << " : " << *le << " : applying" << endl;
+ le->replay(mds);
+ }
+ delete le;
+ }
+
+ // wait for read?
+ if (journaler->get_read_pos() < journaler->get_write_pos()) {
+ journaler->wait_for_readable(new C_MDL_Replay(this));
+ return;
+ }
+
+ // done!
+ assert(journaler->get_read_pos() == journaler->get_write_pos());
+ dout(10) << "_replay - complete" << endl;
+
+ // move read pointer _back_ to expire pos, for eventual trimming
+ journaler->set_read_pos(journaler->get_expire_pos());
+
+ // kick waiter(s)
+ list<Context*> ls;
+ ls.swap(waitfor_replay);
+ finish_contexts(ls,0);
+}
+
+
inode_t log_inode;
Journaler *journaler;
+
//hash_map<LogEvent*> trimming; // events currently being trimmed
map<off_t, LogEvent*> trimming;
Logger *logger;
+ list<Context*> waitfor_replay;
+
public:
MDLog(MDS *m);
~MDLog();
void reset(); // fresh, empty log!
void open(Context *onopen);
void write_head(Context *onfinish);
+
+ void replay(Context *onfinish);
+ void _replay();
};
#endif
case 4:
dout(2) << "boot_recover " << step << ": replaying mds log" << endl;
-
- //break;
+ mdlog->replay(new C_MDS_BootRecover(this, 5));
+ break;
case 5:
dout(2) << "boot_recover " << step << ": done." << endl;
*/
- // flush log to disk after every op. for now.
- mdlog->flush();
+ if (is_active()) {
+ // flush log to disk after every op. for now.
+ mdlog->flush();
- // trim cache
- mdcache->trim();
+ // trim cache
+ mdcache->trim();
+ }
// finish any triggered contexts
if (finished_queue.size()) {
finish_contexts(ls);
}
-
// hash root?
// periodic crap (1-second resolution)
static utime_t last_log = g_clock.recent_now();
utime_t now = g_clock.recent_now();
- if (last_log.sec() != now.sec()) {
+ if (is_active() &&
+ last_log.sec() != now.sec()) {
// log
last_log = now;
}
+ void print(ostream& out) {
+ if (what == EALLOC_EV_ALLOC)
+ out << "alloc " << hex << id << dec << " tablev " << table_version;
+ else
+ out << "dealloc " << hex << id << dec << " tablev " << table_version;
+ }
+
+
// live journal
bool can_expire(MDS *mds) {
if (mds->idalloc->get_committed_version() <= table_version)
}
void retire(MDS *mds, Context *c) {
- mds->idalloc->save(c);
+ mds->idalloc->save(c, table_version);
}
// recovery
bool has_happened(MDS *mds) {
- return (mds->idalloc->get_version() >= table_version);
+ if (mds->idalloc->get_version() >= table_version) {
+ cout << " event " << table_version << " <= table " << mds->idalloc->get_version() << endl;
+ return true;
+ } else
+ return false;
}
- void replay(MDS *mds, Context *c) {
+ void replay(MDS *mds) {
assert(table_version-1 == mds->idalloc->get_version());
if (what == EALLOC_EV_ALLOC) {
- idno_t nid = mds->idalloc->alloc_id();
+ idno_t nid = mds->idalloc->alloc_id(true);
assert(nid == id); // this should match.
}
else if (what == EALLOC_EV_FREE) {
- mds->idalloc->reclaim_id(id);
+ mds->idalloc->reclaim_id(id, true);
}
else
assert(0);