logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos());
+
+ submit_thread.create();
}
void MDLog::open(Context *c)
recovery_thread.set_completion(c);
recovery_thread.create();
recovery_thread.detach();
+
+ submit_thread.create();
// either append() or replay() will follow.
}
delete le;
}
+void MDLog::_submit_thread()
+{
+ dout(10) << "_submit_thread start" << dendl;
+
+ submit_mutex.Lock();
+
+ while (!stopping) {
+ map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
+ if (it == pending_events.end()) {
+ submit_cond.Wait(submit_mutex);
+ continue;
+ }
+
+ if (it->second.empty()) {
+ pending_events.erase(it);
+ continue;
+ }
+
+ PendingEvent data = it->second.front();
+ it->second.pop_front();
+
+ submit_mutex.Unlock();
+
+ if (data.le) {
+ LogEvent *le = data.le;
+ LogSegment *ls = le->_segment;
+ // encode it, with event type
+ bufferlist bl;
+ le->encode_with_header(bl);
+
+ mds->mds_lock.Lock();
+
+ uint64_t write_pos = journaler->get_write_pos();
+
+ le->set_start_off(write_pos);
+ if (le->get_type() == EVENT_SUBTREEMAP)
+ ls->offset = write_pos;
+
+ dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
+ << " : " << *le << dendl;
+
+ // journal it.
+ journaler->append_entry(bl); // bl is destroyed.
+ ls->end = journaler->get_write_pos();
+
+ if (data.fin)
+ journaler->wait_for_flush(data.fin);
+ if (data.flush)
+ journaler->flush();
+
+ mds->mds_lock.Unlock();
+
+ if (logger)
+ logger->set(l_mdl_wrpos, ls->end);
+
+ delete le;
+ } else {
+ mds->mds_lock.Lock();
+ if (data.fin)
+ journaler->wait_for_flush(data.fin);
+ if (data.flush)
+ journaler->flush();
+ mds->mds_lock.Unlock();
+ }
+
+ submit_mutex.Lock();
+ if (data.flush)
+ unflushed = 0;
+ else if (data.le)
+ unflushed++;
+ }
+
+ submit_mutex.Unlock();
+}
+
void MDLog::wait_for_safe(Context *c)
{
if (g_conf->mds_log) {
capped = true;
}
+void MDLog::shutdown()
+{
+ dout(5) << "shutdown" << dendl;
+ if (!submit_thread.is_started())
+ return;
+
+ assert(mds->mds_lock.is_locked_by_me());
+ mds->mds_lock.Unlock();
+
+ submit_mutex.Lock();
+ stopping = true;
+ submit_cond.Signal();
+ submit_mutex.Unlock();
+
+ mds->mds_lock.Lock();
+}
+
// -----------------------------
// segments
bool capped;
+ bool stopping;
+
inodeno_t ino;
Journaler *journaler;
int expiring_events;
int expired_events;
+ struct PendingEvent {
+ LogEvent *le;
+ Context *fin;
+ bool flush;
+ PendingEvent(LogEvent *e, Context *c, bool f=false) : le(e), fin(c), flush(f) {}
+ };
+
+ map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
+ Mutex submit_mutex;
+ Cond submit_cond;
+
+ void _submit_thread();
+ class SubmitThread : public Thread {
+ MDLog *log;
+ public:
+ SubmitThread(MDLog *l) : log(l) {}
+ void* entry() {
+ log->_submit_thread();
+ return 0;
+ }
+ } submit_thread;
+ friend class SubmitThread;
+
// -- subtreemaps --
friend class ESubtreeMap;
friend class C_MDS_WroteImportMap;
num_events(0),
unflushed(0),
capped(false),
+ stopping(false),
journaler(0),
logger(0),
replay_thread(this),
already_replayed(false),
recovery_thread(this),
event_seq(0), expiring_events(0), expired_events(0),
+ submit_mutex("MDLog::submit_mutex"),
+ submit_thread(this),
cur_event(NULL) { }
~MDLog();
bool is_capped() { return capped; }
void cap();
+ void shutdown();
+
// -- events --
private:
LogEvent *cur_event;