{
dout(1) << "reset to blank journal" << dendl;
state = STATE_ACTIVE;
- write_pos = flush_pos = ack_pos =
+ write_pos = flush_pos = ack_pos = safe_pos =
read_pos = requested_pos = received_pos =
expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(inode.layout);
}
assert(bl.length() == sizeof(h));
bl.copy(0, sizeof(h), (char*)&h);
- write_pos = flush_pos = ack_pos = h.write_pos;
+ write_pos = flush_pos = ack_pos = safe_pos = h.write_pos;
read_pos = requested_pos = received_pos = h.read_pos;
expire_pos = h.expire_pos;
trimmed_pos = trimming_pos = h.trimmed_pos;
<< dendl;
}
- write_pos = flush_pos = ack_pos = end;
+ write_pos = flush_pos = ack_pos = safe_pos = end;
// done.
list<Context*> ls;
last_written.trimmed_pos = trimmed_pos;
last_written.expire_pos = expire_pos;
last_written.read_pos = read_pos;
- last_written.write_pos = ack_pos; //write_pos;
+ last_written.write_pos = safe_pos;
dout(10) << "write_head " << last_written << dendl;
last_wrote_head = g_clock.now();
class Journaler::C_Flush : public Context {
Journaler *ls;
__s64 start;
+ bool safe;
public:
- C_Flush(Journaler *l, __s64 s) : ls(l), start(s) {}
- void finish(int r) { ls->_finish_flush(r, start); }
+ C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {}
+ void finish(int r) { ls->_finish_flush(r, start, safe); }
};
-void Journaler::_finish_flush(int r, __s64 start)
+void Journaler::_finish_flush(int r, __s64 start, bool safe)
{
assert(r>=0);
- assert(start >= ack_pos);
+ assert((!safe && start >= ack_pos) || (safe && start >= safe_pos));
assert(start < flush_pos);
assert(pending_flush.count(start));
if (logger) {
utime_t lat = g_clock.now();
lat -= pending_flush[start];
- logger->favg("jlat", lat);
+ logger->favg(safe ? "jsafelat" : "jacklat", lat);
}
- pending_flush.erase(start);
-
// adjust ack_pos
- if (pending_flush.empty())
- ack_pos = flush_pos;
- else
+ if (!safe) {
ack_pos = pending_flush.begin()->first;
+ } else {
+ pending_flush.erase(start);
+ if (pending_flush.empty())
+ safe_pos = flush_pos;
+ else
+ safe_pos = pending_flush.begin()->first;
+ }
- dout(10) << "_finish_flush from " << start
+ dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start
<< ", pending_flush now " << pending_flush
- << ", write positions now " << write_pos << "/" << flush_pos << "/" << ack_pos
+ << ", write positions now " << write_pos << "/" << flush_pos
+ << "/" << ack_pos << "/" << safe_pos
<< dendl;
// kick waiters <= ack_pos
- while (!waitfor_flush.empty()) {
- if (waitfor_flush.begin()->first > ack_pos) break;
- finish_contexts(waitfor_flush.begin()->second);
- waitfor_flush.erase(waitfor_flush.begin());
+ if (!safe) {
+ while (!waitfor_ack.empty()) {
+ if (waitfor_ack.begin()->first > ack_pos) break;
+ finish_contexts(waitfor_ack.begin()->second);
+ waitfor_ack.erase(waitfor_ack.begin());
+ }
+ } else {
+ while (!waitfor_safe.empty()) {
+ if (waitfor_safe.begin()->first > safe_pos) break;
+ finish_contexts(waitfor_safe.begin()->second);
+ waitfor_safe.erase(waitfor_safe.begin());
+ }
+
}
}
// submit write for anything pending
// flush _start_ pos to _finish_flush
- filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL,
- g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK
- g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT
+ filer.write(inode, flush_pos, len, write_buf,
+ CEPH_OSD_OP_INCLOCK_FAIL,
+ new C_Flush(this, flush_pos, false), // on ACK
+ new C_Flush(this, flush_pos, true)); // on COMMIT
pending_flush[flush_pos] = g_clock.now();
// adjust pointers
-void Journaler::flush(Context *onsync)
+void Journaler::flush(Context *onsync, Context *onsafe)
{
// all flushed and acked?
if (write_pos == ack_pos) {
assert(write_buf.length() == 0);
- dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
+ dout(10) << "flush nothing to flush, write pointers at "
+ << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
if (onsync) {
onsync->finish(0);
delete onsync;
+ onsync = 0;
+ }
+ if (onsafe && write_pos == safe_pos) {
+ onsafe->finish(0);
+ delete onsafe;
+ onsafe = 0;
}
return;
}
if (write_pos == flush_pos) {
assert(write_buf.length() == 0);
- dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
+ dout(10) << "flush nothing to flush, write pointers at "
+ << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
} else {
if (1) {
// maybe buffer
// queue waiter (at _new_ write_pos; will go when reached by ack_pos)
if (onsync)
- waitfor_flush[write_pos].push_back(onsync);
+ waitfor_ack[write_pos].push_back(onsync);
+ if (onsafe)
+ waitfor_safe[write_pos].push_back(onsafe);
// write head?
if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
if (flush_pos == ack_pos)
flush();
assert(flush_pos > ack_pos);
- waitfor_flush[flush_pos].push_back(new C_RetryRead(this));
+ waitfor_ack[flush_pos].push_back(new C_RetryRead(this));
return;
}
// partial fragment at the end?
if (received_pos == write_pos) {
dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
- write_pos = flush_pos = ack_pos = read_pos;
+ write_pos = flush_pos = ack_pos = safe_pos = read_pos;
assert(write_buf.length() == 0);
// truncate?
__s64 write_pos; // logical write position, where next entry will go
__s64 flush_pos; // where we will flush. if write_pos>flush_pos, we're buffering writes.
__s64 ack_pos; // what has been acked.
+ __s64 safe_pos; // what has been committed safely to disk.
bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos.
std::map<__s64, utime_t> pending_flush; // start offsets and times for pending flushes
- std::map<__s64, std::list<Context*> > waitfor_flush; // when flushed through given offset
+ std::map<__s64, std::list<Context*> > waitfor_ack; // when flushed through given offset
+ std::map<__s64, std::list<Context*> > waitfor_safe; // when safe through given offset
void _do_flush();
- void _finish_flush(int r, __s64 start);
+ void _finish_flush(int r, __s64 start, bool safe);
class C_Flush;
friend class C_Flush;
inode(inode_), objecter(obj), filer(objecter), logger(l),
lock(lk), timer(*lk), delay_flush_event(0),
state(STATE_UNDEF), error(0),
- write_pos(0), flush_pos(0), ack_pos(0),
+ write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(fl), prefetch_from(pff),
read_bl(0), on_read_finish(0), on_readable(0),
// write
__s64 append_entry(bufferlist& bl, Context *onsync = 0);
- void flush(Context *onsync = 0);
+ void flush(Context *onsync = 0, Context *onsafe = 0);
// read
void set_read_pos(__s64 p) {