- should we pad with zeros to avoid splitting individual entries?
- make it a g_conf flag?
- have to fix reader to skip over zeros (either <4 bytes for size, or zeroed sizes)
-- need to truncate at detected (valid) write_pos to clear out any other partial trailing writes
mon
}
};
+struct C_MDL_ReplayTruncated : public Context {
+ MDLog *mdl;
+ C_MDL_ReplayTruncated(MDLog *l) : mdl(l) {}
+ void finish(int r) {
+ mdl->mds->mds_lock.Lock();
+ mdl->_replay_truncated();
+ mdl->mds->mds_lock.Unlock();
+ }
+};
+
// i am a separate thread
// done!
if (r == 0) {
assert(journaler->get_read_pos() == journaler->get_write_pos());
- dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
-
+ dout(10) << "_replay - complete, " << num_events
+ << " events, new read/expire pos is " << new_expire_pos << dendl;
+
// move read pointer _back_ to first subtree map we saw, for eventual trimming
journaler->set_read_pos(new_expire_pos);
journaler->set_expire_pos(new_expire_pos);
logger->set(l_mdl_expos, new_expire_pos);
+
+ dout(10) << "_replay - truncating at " << journaler->get_write_pos() << dendl;
+ Context *c = new C_MDL_ReplayTruncated(this);
+ if (journaler->truncate_tail_junk(c)) {
+ delete c;
+
+ dout(10) << "_replay_thread nothing to truncate, kicking waiters" << dendl;
+ finish_contexts(waitfor_replay, 0);
+ }
+ } else {
+ dout(10) << "_replay_thread kicking waiters" << dendl;
+ finish_contexts(waitfor_replay, r);
}
- // kick waiter(s)
- list<Context*> ls;
- ls.swap(waitfor_replay);
- finish_contexts(ls, r);
-
dout(10) << "_replay_thread finish" << dendl;
mds->mds_lock.Unlock();
}
+void MDLog::_replay_truncated()
+{
+ dout(10) << "_replay_truncated" << dendl;
+ finish_contexts(waitfor_replay, 0);
+}
+
class MDLog {
- protected:
+public:
MDS *mds;
+protected:
int num_events; // in events
int unflushed;
void _replay(); // old way
void _replay_thread(); // new way
+ void _replay_truncated();
+ friend class C_MDL_ReplayTruncated;
// -- segments --
// partial fragment at the end?
if (received_pos == write_pos) {
dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
+ if (write_pos > read_pos)
+ junk_tail_pos = write_pos; // note old tail
write_pos = flush_pos = ack_pos = safe_pos = read_pos;
assert(write_buf.length() == 0);
return false;
}
+bool Journaler::truncate_tail_junk(Context *c)
+{
+ if (!junk_tail_pos) {
+ dout(10) << "truncate_tail_junk -- no trailing junk" << dendl;
+ return true;
+ }
+
+ __s64 len = junk_tail_pos - write_pos;
+ dout(10) << "truncate_tail_junk " << write_pos << "~" << len << dendl;
+ SnapContext snapc;
+ filer.zero(ino, &layout, snapc, write_pos, len, g_clock.now(), 0, NULL, c);
+ return false;
+}
+
/* try_read_entry(bl)
* read entry into bl if it's ready.
__s64 fetch_len; // how much to read at a time
__s64 prefetch_from; // how far from end do we read next chunk
+ __s64 junk_tail_pos; // for truncate
+
// for read_entry() in-progress read
bufferlist *read_bl;
Context *on_read_finish;
write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), prefetch_from(0),
+ junk_tail_pos(0),
read_bl(0), on_read_finish(0), on_readable(0),
expire_pos(0), trimming_pos(0), trimmed_pos(0)
{
void wait_for_readable(Context *onfinish);
void read_entry(bufferlist* bl, Context *onfinish);
+ bool truncate_tail_junk(Context *fin);
+
// trim
void set_expire_pos(__s64 ep) { expire_pos = ep; }
void trim();