class LogSegment {
public:
- off_t offset, end;
+ loff_t offset, end;
int num_events;
+ loff_t trimmable_at;
// dirty items
xlist<CDir*> dirty_dirfrags;
C_Gather *try_to_expire(MDS *mds);
// cons
- LogSegment(off_t off) : offset(off), end(off), num_events(0),
+ LogSegment(loff_t off) : offset(off), end(off), num_events(0), trimmable_at(0),
allocv(0), sessionmapv(0), anchortablev(0)
{ }
};
le->metablob.add_primary_dentry(in->get_parent_dn(), true, 0, pi);
mds->mdlog->submit_entry(le);
- mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in,
+ mds->mdlog->wait_for_safe(new C_MDC_SubtreeMergeWB(this, in,
mds->mdlog->get_current_segment()));
}
}
journaler->write_head(c);
}
-off_t MDLog::get_read_pos()
+loff_t MDLog::get_read_pos()
{
return journaler->get_read_pos();
}
-off_t MDLog::get_write_pos()
+loff_t MDLog::get_write_pos()
{
return journaler->get_write_pos();
}
+loff_t MDLog::get_safe_pos()
+{
+ return journaler->get_write_safe_pos();
+}
+
void MDLog::create(Context *c)
delete c;
}
}
+void MDLog::wait_for_safe( Context *c )
+{
+ if (g_conf.mds_log) {
+ // wait
+ journaler->flush(0, c);
+ } else {
+ // hack: bypass.
+ c->finish(0);
+ delete c;
+ }
+}
void MDLog::flush()
{
size_t get_num_segments() { return segments.size(); }
void set_max_segments(int m) { max_segments = m; }
- off_t get_read_pos();
- off_t get_write_pos();
+ loff_t get_read_pos();
+ loff_t get_write_pos();
+ loff_t get_safe_pos();
bool empty() { return segments.empty(); }
bool is_capped() { return capped; }
void submit_entry( LogEvent *e, Context *c = 0 );
void wait_for_sync( Context *c );
+ void wait_for_safe( Context *c );
void flush();
bool is_flushed() {
return unflushed == 0;
if (is_active() || is_stopping()) {
// flush log to disk after every op. for now.
//mdlog->flush();
+ mdlog->trim();
// trim cache
mdcache->trim();
}
// log export completion, then finish (unfreeze, trigger finish context, etc.)
- mds->mdlog->submit_entry(le,
- new C_MDS_ExportFinishLogged(this, dir));
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_safe(new C_MDS_ExportFinishLogged(this, dir));
delete m;
}
dout(7) << "handle_export_dir did " << *dir << dendl;
// log it
- mds->mdlog->submit_entry(le, onlogged);
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_safe(onlogged);
// note state
import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
mds->server->prepare_force_open_sessions(ex->client_map);
le->client_map.swap(ex->client_map);
- mds->mdlog->submit_entry(le, finish);
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_safe(finish);
delete ex;
}
// log + wait
C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, mdr, dn, in);
mdlog->submit_entry(le, fin);
-
- /*
- FIXME. this needs to be rewritten when the write capability stuff starts
- getting journaled.
- */
}
// FIXME client requests...?
// audit handling of anchor transactions?
+ // once we are otherwise trimmable, make sure journal is fully safe on disk.
+ if (!gather) {
+ if (trimmable_at &&
+ trimmable_at <= mds->mdlog->get_safe_pos()) {
+ dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
+ << " <= " << mds->mdlog->get_safe_pos() << dendl;
+ } else {
+ if (trimmable_at == 0) {
+ trimmable_at = mds->mdlog->get_write_pos();
+ dout(6) << "LogSegment(" << offset << ").try_to_expire now trimmable at " << trimmable_at
+ << ", waiting for safe journal flush" << dendl;
+ } else {
+ dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
+ << " > " << mds->mdlog->get_safe_pos()
+ << ", waiting for safe journal flush" << dendl;
+ }
+ if (!gather) gather = new C_Gather;
+ mds->mdlog->wait_for_safe(gather->new_sub());
+ }
+ }
+
if (gather) {
dout(6) << "LogSegment(" << offset << ").try_to_expire waiting" << dendl;
} else {