From: Sage Weil Date: Tue, 10 Jun 2008 13:40:30 +0000 (-0700) Subject: journaler: ack barriers X-Git-Tag: v0.3~142 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4daed641c0af0be5823e67e68e7930de44cc8277;p=ceph.git journaler: ack barriers --- diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 3d11a039fe2b..b43a6cd90e1c 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -191,40 +191,63 @@ void Journaler::_finish_write_head(Header &wrote, Context *oncommit) class Journaler::C_Flush : public Context { Journaler *ls; __s64 start; + utime_t stamp; bool safe; public: - C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {} - void finish(int r) { ls->_finish_flush(r, start, safe); } + C_Flush(Journaler *l, __s64 s, utime_t st, bool sa) : ls(l), start(s), stamp(st), safe(sa) {} + void finish(int r) { ls->_finish_flush(r, start, stamp, safe); } }; -void Journaler::_finish_flush(int r, __s64 start, bool safe) +void Journaler::_finish_flush(int r, __s64 start, utime_t stamp, bool safe) { assert(r>=0); assert((!safe && start >= ack_pos) || (safe && start >= safe_pos)); assert(start < flush_pos); - assert(pending_flush.count(start)); // calc latency? if (logger) { utime_t lat = g_clock.now(); - lat -= pending_flush[start]; + lat -= stamp; logger->favg(safe ? "jsafelat" : "jacklat", lat); } // adjust ack_pos if (!safe) { - ack_pos = pending_flush.begin()->first; + assert(pending_ack.count(start)); + pending_ack.erase(start); + if (pending_ack.empty()) + ack_pos = flush_pos; + else + ack_pos = *pending_ack.begin(); + if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos) + ack_pos = *ack_barrier.begin(); } else { - pending_flush.erase(start); - if (pending_flush.empty()) + assert(pending_safe.count(start)); + pending_safe.erase(start); + if (pending_safe.empty()) safe_pos = flush_pos; else - safe_pos = pending_flush.begin()->first; + safe_pos = *pending_safe.begin(); + + if (ack_barrier.count(start)) { + ack_barrier.erase(start); + + if (ack_pos == start) { + if (pending_ack.empty()) + ack_pos = flush_pos; + else + ack_pos = *pending_ack.begin(); + if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos) + ack_pos = *ack_barrier.begin(); + } + } } dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start - << ", pending_flush now " << pending_flush + << ", pending_ack " << pending_ack + //<< ", pending_safe " << pending_safe + << ", ack_barrier " << ack_barrier << ", write positions now " << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl; @@ -314,11 +337,13 @@ void Journaler::_do_flush() // submit write for anything pending // flush _start_ pos to _finish_flush + utime_t now = g_clock.now(); filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL, - new C_Flush(this, flush_pos, false), // on ACK - new C_Flush(this, flush_pos, true)); // on COMMIT - pending_flush[flush_pos] = g_clock.now(); + new C_Flush(this, flush_pos, now, false), // on ACK + new C_Flush(this, flush_pos, now, true)); // on COMMIT + pending_ack.insert(flush_pos); + pending_safe.insert(flush_pos); // adjust pointers flush_pos = write_pos; @@ -329,7 +354,7 @@ void Journaler::_do_flush() -void Journaler::flush(Context *onsync, Context *onsafe) +void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) { // all flushed and acked? if (write_pos == ack_pos) { @@ -341,10 +366,14 @@ void Journaler::flush(Context *onsync, Context *onsafe) delete onsync; onsync = 0; } - if (onsafe && write_pos == safe_pos) { - onsafe->finish(0); - delete onsafe; - onsafe = 0; + if (onsafe) { + if (write_pos == safe_pos) { + onsafe->finish(0); + delete onsafe; + onsafe = 0; + } else { + waitfor_safe[write_pos].push_back(onsafe); + } } return; } @@ -377,6 +406,8 @@ void Journaler::flush(Context *onsync, Context *onsafe) waitfor_ack[write_pos].push_back(onsync); if (onsafe) waitfor_safe[write_pos].push_back(onsafe); + if (add_ack_barrier) + ack_barrier.insert(write_pos); // write head? if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) { diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 7daed2043589..410760300ae0 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -140,12 +140,13 @@ public: __s64 safe_pos; // what has been committed safely to disk. bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos. - std::map<__s64, utime_t> pending_flush; // start offsets and times for pending flushes + std::set<__s64> pending_ack, pending_safe; std::map<__s64, std::list > waitfor_ack; // when flushed through given offset std::map<__s64, std::list > waitfor_safe; // when safe through given offset + std::set<__s64> ack_barrier; void _do_flush(); - void _finish_flush(int r, __s64 start, bool safe); + void _finish_flush(int r, __s64 start, utime_t stamp, bool safe); class C_Flush; friend class C_Flush; @@ -223,13 +224,14 @@ public: __s64 get_write_pos() const { return write_pos; } __s64 get_write_ack_pos() const { return ack_pos; } + __s64 get_write_safe_pos() const { return safe_pos; } __s64 get_read_pos() const { return read_pos; } __s64 get_expire_pos() const { return expire_pos; } __s64 get_trimmed_pos() const { return trimmed_pos; } // write __s64 append_entry(bufferlist& bl, Context *onsync = 0); - void flush(Context *onsync = 0, Context *onsafe = 0); + void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false); // read void set_read_pos(__s64 p) {