class Journaler::C_Flush : public Context {
Journaler *ls;
__s64 start;
+ utime_t stamp;
bool safe;
public:
- C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {}
- void finish(int r) { ls->_finish_flush(r, start, safe); }
+ C_Flush(Journaler *l, __s64 s, utime_t st, bool sa) : ls(l), start(s), stamp(st), safe(sa) {}
+ void finish(int r) { ls->_finish_flush(r, start, stamp, safe); }
};
-void Journaler::_finish_flush(int r, __s64 start, bool safe)
+void Journaler::_finish_flush(int r, __s64 start, utime_t stamp, bool safe)
{
assert(r>=0);
assert((!safe && start >= ack_pos) || (safe && start >= safe_pos));
assert(start < flush_pos);
- assert(pending_flush.count(start));
// calc latency?
if (logger) {
utime_t lat = g_clock.now();
- lat -= pending_flush[start];
+ lat -= stamp;
logger->favg(safe ? "jsafelat" : "jacklat", lat);
}
// adjust ack_pos
if (!safe) {
- ack_pos = pending_flush.begin()->first;
+ assert(pending_ack.count(start));
+ pending_ack.erase(start);
+ if (pending_ack.empty())
+ ack_pos = flush_pos;
+ else
+ ack_pos = *pending_ack.begin();
+ if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos)
+ ack_pos = *ack_barrier.begin();
} else {
- pending_flush.erase(start);
- if (pending_flush.empty())
+ assert(pending_safe.count(start));
+ pending_safe.erase(start);
+ if (pending_safe.empty())
safe_pos = flush_pos;
else
- safe_pos = pending_flush.begin()->first;
+ safe_pos = *pending_safe.begin();
+
+ if (ack_barrier.count(start)) {
+ ack_barrier.erase(start);
+
+ if (ack_pos == start) {
+ if (pending_ack.empty())
+ ack_pos = flush_pos;
+ else
+ ack_pos = *pending_ack.begin();
+ if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos)
+ ack_pos = *ack_barrier.begin();
+ }
+ }
}
dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start
- << ", pending_flush now " << pending_flush
+ << ", pending_ack " << pending_ack
+ //<< ", pending_safe " << pending_safe
+ << ", ack_barrier " << ack_barrier
<< ", write positions now " << write_pos << "/" << flush_pos
<< "/" << ack_pos << "/" << safe_pos
<< dendl;
// submit write for anything pending
// flush _start_ pos to _finish_flush
+ utime_t now = g_clock.now();
filer.write(inode, flush_pos, len, write_buf,
CEPH_OSD_OP_INCLOCK_FAIL,
- new C_Flush(this, flush_pos, false), // on ACK
- new C_Flush(this, flush_pos, true)); // on COMMIT
- pending_flush[flush_pos] = g_clock.now();
+ new C_Flush(this, flush_pos, now, false), // on ACK
+ new C_Flush(this, flush_pos, now, true)); // on COMMIT
+ pending_ack.insert(flush_pos);
+ pending_safe.insert(flush_pos);
// adjust pointers
flush_pos = write_pos;
-void Journaler::flush(Context *onsync, Context *onsafe)
+void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
{
// all flushed and acked?
if (write_pos == ack_pos) {
delete onsync;
onsync = 0;
}
- if (onsafe && write_pos == safe_pos) {
- onsafe->finish(0);
- delete onsafe;
- onsafe = 0;
+ if (onsafe) {
+ if (write_pos == safe_pos) {
+ onsafe->finish(0);
+ delete onsafe;
+ onsafe = 0;
+ } else {
+ waitfor_safe[write_pos].push_back(onsafe);
+ }
}
return;
}
waitfor_ack[write_pos].push_back(onsync);
if (onsafe)
waitfor_safe[write_pos].push_back(onsafe);
+ if (add_ack_barrier)
+ ack_barrier.insert(write_pos);
// write head?
if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
__s64 safe_pos; // what has been committed safely to disk.
bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos.
- std::map<__s64, utime_t> pending_flush; // start offsets and times for pending flushes
+ std::set<__s64> pending_ack, pending_safe;
std::map<__s64, std::list<Context*> > waitfor_ack; // when flushed through given offset
std::map<__s64, std::list<Context*> > waitfor_safe; // when safe through given offset
+ std::set<__s64> ack_barrier;
void _do_flush();
- void _finish_flush(int r, __s64 start, bool safe);
+ void _finish_flush(int r, __s64 start, utime_t stamp, bool safe);
class C_Flush;
friend class C_Flush;
__s64 get_write_pos() const { return write_pos; }
__s64 get_write_ack_pos() const { return ack_pos; }
+ __s64 get_write_safe_pos() const { return safe_pos; }
__s64 get_read_pos() const { return read_pos; }
__s64 get_expire_pos() const { return expire_pos; }
__s64 get_trimmed_pos() const { return trimmed_pos; }
// write
__s64 append_entry(bufferlist& bl, Context *onsync = 0);
- void flush(Context *onsync = 0, Context *onsafe = 0);
+ void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
// read
void set_read_pos(__s64 p) {