From: Sage Weil Date: Sat, 7 Jun 2008 14:06:31 +0000 (-0700) Subject: journaler: wait for ack and safe.. ack_pos flushed thing still wonky tho X-Git-Tag: v0.3~143 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9224945c2e7c97c4c78af57aca89a9375a8937ee;p=ceph.git journaler: wait for ack and safe.. ack_pos flushed thing still wonky tho --- diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 88ec35d5d5f..04b4eda42f4 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -781,7 +781,8 @@ enum { CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */ CEPH_OSD_OP_RETRY = 4, /* resend attempt */ CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */ - CEPH_OSD_OP_BALANCE_READS = 16 + CEPH_OSD_OP_BALANCE_READS = 16, + CEPH_OSD_OP_ACKNVRAM = 32, /* ACK when stable in NVRAM, not RAM */ }; struct ceph_osd_peer_stat { diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 52b0250383f..3d11a039fe2 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -29,7 +29,7 @@ void Journaler::reset() { dout(1) << "reset to blank journal" << dendl; state = STATE_ACTIVE; - write_pos = flush_pos = ack_pos = + write_pos = flush_pos = ack_pos = safe_pos = read_pos = requested_pos = received_pos = expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(inode.layout); } @@ -102,7 +102,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) assert(bl.length() == sizeof(h)); bl.copy(0, sizeof(h), (char*)&h); - write_pos = flush_pos = ack_pos = h.write_pos; + write_pos = flush_pos = ack_pos = safe_pos = h.write_pos; read_pos = requested_pos = received_pos = h.read_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -133,7 +133,7 @@ void Journaler::_finish_probe_end(int r, __s64 end) << dendl; } - write_pos = flush_pos = ack_pos = end; + write_pos = flush_pos = ack_pos = safe_pos = end; // done. list ls; @@ -161,7 +161,7 @@ void Journaler::write_head(Context *oncommit) last_written.trimmed_pos = trimmed_pos; last_written.expire_pos = expire_pos; last_written.read_pos = read_pos; - last_written.write_pos = ack_pos; //write_pos; + last_written.write_pos = safe_pos; dout(10) << "write_head " << last_written << dendl; last_wrote_head = g_clock.now(); @@ -191,16 +191,17 @@ void Journaler::_finish_write_head(Header &wrote, Context *oncommit) class Journaler::C_Flush : public Context { Journaler *ls; __s64 start; + bool safe; public: - C_Flush(Journaler *l, __s64 s) : ls(l), start(s) {} - void finish(int r) { ls->_finish_flush(r, start); } + C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {} + void finish(int r) { ls->_finish_flush(r, start, safe); } }; -void Journaler::_finish_flush(int r, __s64 start) +void Journaler::_finish_flush(int r, __s64 start, bool safe) { assert(r>=0); - assert(start >= ack_pos); + assert((!safe && start >= ack_pos) || (safe && start >= safe_pos)); assert(start < flush_pos); assert(pending_flush.count(start)); @@ -208,27 +209,40 @@ void Journaler::_finish_flush(int r, __s64 start) if (logger) { utime_t lat = g_clock.now(); lat -= pending_flush[start]; - logger->favg("jlat", lat); + logger->favg(safe ? "jsafelat" : "jacklat", lat); } - pending_flush.erase(start); - // adjust ack_pos - if (pending_flush.empty()) - ack_pos = flush_pos; - else + if (!safe) { ack_pos = pending_flush.begin()->first; + } else { + pending_flush.erase(start); + if (pending_flush.empty()) + safe_pos = flush_pos; + else + safe_pos = pending_flush.begin()->first; + } - dout(10) << "_finish_flush from " << start + dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start << ", pending_flush now " << pending_flush - << ", write positions now " << write_pos << "/" << flush_pos << "/" << ack_pos + << ", write positions now " << write_pos << "/" << flush_pos + << "/" << ack_pos << "/" << safe_pos << dendl; // kick waiters <= ack_pos - while (!waitfor_flush.empty()) { - if (waitfor_flush.begin()->first > ack_pos) break; - finish_contexts(waitfor_flush.begin()->second); - waitfor_flush.erase(waitfor_flush.begin()); + if (!safe) { + while (!waitfor_ack.empty()) { + if (waitfor_ack.begin()->first > ack_pos) break; + finish_contexts(waitfor_ack.begin()->second); + waitfor_ack.erase(waitfor_ack.begin()); + } + } else { + while (!waitfor_safe.empty()) { + if (waitfor_safe.begin()->first > safe_pos) break; + finish_contexts(waitfor_safe.begin()->second); + waitfor_safe.erase(waitfor_safe.begin()); + } + } } @@ -300,9 +314,10 @@ void Journaler::_do_flush() // submit write for anything pending // flush _start_ pos to _finish_flush - filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL, - g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK - g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT + filer.write(inode, flush_pos, len, write_buf, + CEPH_OSD_OP_INCLOCK_FAIL, + new C_Flush(this, flush_pos, false), // on ACK + new C_Flush(this, flush_pos, true)); // on COMMIT pending_flush[flush_pos] = g_clock.now(); // adjust pointers @@ -314,22 +329,30 @@ void Journaler::_do_flush() -void Journaler::flush(Context *onsync) +void Journaler::flush(Context *onsync, Context *onsafe) { // all flushed and acked? if (write_pos == ack_pos) { assert(write_buf.length() == 0); - dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; + dout(10) << "flush nothing to flush, write pointers at " + << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl; if (onsync) { onsync->finish(0); delete onsync; + onsync = 0; + } + if (onsafe && write_pos == safe_pos) { + onsafe->finish(0); + delete onsafe; + onsafe = 0; } return; } if (write_pos == flush_pos) { assert(write_buf.length() == 0); - dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; + dout(10) << "flush nothing to flush, write pointers at " + << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl; } else { if (1) { // maybe buffer @@ -351,7 +374,9 @@ void Journaler::flush(Context *onsync) // queue waiter (at _new_ write_pos; will go when reached by ack_pos) if (onsync) - waitfor_flush[write_pos].push_back(onsync); + waitfor_ack[write_pos].push_back(onsync); + if (onsafe) + waitfor_safe[write_pos].push_back(onsafe); // write head? if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) { @@ -455,7 +480,7 @@ void Journaler::_issue_read(__s64 len) if (flush_pos == ack_pos) flush(); assert(flush_pos > ack_pos); - waitfor_flush[flush_pos].push_back(new C_RetryRead(this)); + waitfor_ack[flush_pos].push_back(new C_RetryRead(this)); return; } @@ -547,7 +572,7 @@ bool Journaler::is_readable() // partial fragment at the end? if (received_pos == write_pos) { dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl; - write_pos = flush_pos = ack_pos = read_pos; + write_pos = flush_pos = ack_pos = safe_pos = read_pos; assert(write_buf.length() == 0); // truncate? diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 10520cc2eda..7daed204358 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -137,13 +137,15 @@ public: __s64 write_pos; // logical write position, where next entry will go __s64 flush_pos; // where we will flush. if write_pos>flush_pos, we're buffering writes. __s64 ack_pos; // what has been acked. + __s64 safe_pos; // what has been committed safely to disk. bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos. std::map<__s64, utime_t> pending_flush; // start offsets and times for pending flushes - std::map<__s64, std::list > waitfor_flush; // when flushed through given offset + std::map<__s64, std::list > waitfor_ack; // when flushed through given offset + std::map<__s64, std::list > waitfor_safe; // when safe through given offset void _do_flush(); - void _finish_flush(int r, __s64 start); + void _finish_flush(int r, __s64 start, bool safe); class C_Flush; friend class C_Flush; @@ -189,7 +191,7 @@ public: inode(inode_), objecter(obj), filer(objecter), logger(l), lock(lk), timer(*lk), delay_flush_event(0), state(STATE_UNDEF), error(0), - write_pos(0), flush_pos(0), ack_pos(0), + write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0), read_pos(0), requested_pos(0), received_pos(0), fetch_len(fl), prefetch_from(pff), read_bl(0), on_read_finish(0), on_readable(0), @@ -227,7 +229,7 @@ public: // write __s64 append_entry(bufferlist& bl, Context *onsync = 0); - void flush(Context *onsync = 0); + void flush(Context *onsync = 0, Context *onsafe = 0); // read void set_read_pos(__s64 p) {