From 530cc2dbf33196204888e899bc0557bbd59608e0 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 14 Nov 2008 13:48:30 -0800 Subject: [PATCH] mon: commit large numbers of state values quickly Write them all, then sync once at the end. Also include some infrastructure for using the latest stashed value to recover. Don't use it yet, though. The interaction with keeping last_committed and latest stashed values in sync wrt a failure between the two is a bit tricky. --- src/messages/MMonPaxos.h | 16 +++++++++++--- src/mon/MonitorStore.cc | 14 +++++++++--- src/mon/MonitorStore.h | 10 +++++---- src/mon/Paxos.cc | 48 +++++++++++++++++++++++++++++++--------- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/src/messages/MMonPaxos.h b/src/messages/MMonPaxos.h index ee9b78f1fd9aa..4c93aedf0c328 100644 --- a/src/messages/MMonPaxos.h +++ b/src/messages/MMonPaxos.h @@ -53,6 +53,9 @@ class MMonPaxos : public Message { version_t uncommitted_pn; // previous pn, if we are a LAST with an uncommitted value utime_t lease_expire; + version_t latest_version; + bufferlist latest_value; + map values; MMonPaxos() : Message(MSG_MON_PAXOS) {} @@ -60,7 +63,8 @@ class MMonPaxos : public Message { Message(MSG_MON_PAXOS), epoch(e), op(o), machine_id(mid), - first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { } + first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0), + latest_version(0) { } const char *get_type_name() { return "paxos"; } @@ -69,8 +73,10 @@ class MMonPaxos : public Message { << " " << get_opname(op) << " lc " << last_committed << " fc " << first_committed - << " pn " << pn << " opn " << uncommitted_pn - << ")"; + << " pn " << pn << " opn " << uncommitted_pn; + if (latest_version) + out << " latest " << latest_version << " (" << latest_value.length() << " bytes)"; + out << ")"; } void encode_payload() { @@ -83,6 +89,8 @@ class MMonPaxos : public Message { ::encode(pn, payload); ::encode(uncommitted_pn, payload); ::encode(lease_expire, payload); + ::encode(latest_version, payload); + ::encode(latest_value, payload); ::encode(values, payload); } void decode_payload() { @@ -96,6 +104,8 @@ class MMonPaxos : public Message { ::decode(pn, p); ::decode(uncommitted_pn, p); ::decode(lease_expire, p); + ::decode(latest_version, p); + ::decode(latest_value, p); ::decode(values, p); } }; diff --git a/src/mon/MonitorStore.cc b/src/mon/MonitorStore.cc index 315e02b95cd3a..063c20a277e95 100644 --- a/src/mon/MonitorStore.cc +++ b/src/mon/MonitorStore.cc @@ -92,6 +92,11 @@ int MonitorStore::mkfs() return r; } +void MonitorStore::sync() +{ + dout(10) << "sync" << dendl; + ::sync(); +} version_t MonitorStore::get_int(const char *a, const char *b) { @@ -120,7 +125,7 @@ version_t MonitorStore::get_int(const char *a, const char *b) } -void MonitorStore::put_int(version_t val, const char *a, const char *b) +void MonitorStore::put_int(version_t val, const char *a, const char *b, bool sync) { char fn[200]; sprintf(fn, "%s/%s", dir.c_str(), a); @@ -145,6 +150,8 @@ void MonitorStore::put_int(version_t val, const char *a, const char *b) int fd = ::open(tfn, O_WRONLY|O_CREAT, 0644); assert(fd >= 0); ::write(fd, vs, strlen(vs)); + if (sync) + ::fsync(fd); ::close(fd); ::rename(tfn, fn); } @@ -231,7 +238,7 @@ int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b) return len; } -int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b) +int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync) { char fn[200]; sprintf(fn, "%s/%s", dir.c_str(), a); @@ -259,7 +266,8 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b) derr(0) << "put_bl_ss ::write() errored out, errno is " << strerror(errno) << dendl; } - ::fsync(fd); + if (sync) + ::fsync(fd); ::close(fd); ::rename(tfn, fn); diff --git a/src/mon/MonitorStore.h b/src/mon/MonitorStore.h index 5f95cc2bdc665..01bae2bfe7e91 100644 --- a/src/mon/MonitorStore.h +++ b/src/mon/MonitorStore.h @@ -32,15 +32,17 @@ public: int mount(); int umount(); + void sync(); + // ints (stored as ascii) version_t get_int(const char *a, const char *b=0); - void put_int(version_t v, const char *a, const char *b=0); + void put_int(version_t v, const char *a, const char *b=0, bool sync=true); // buffers // ss and sn varieties. bool exists_bl_ss(const char *a, const char *b=0); int get_bl_ss(bufferlist& bl, const char *a, const char *b); - int put_bl_ss(bufferlist& bl, const char *a, const char *b); + int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true); bool exists_bl_sn(const char *a, version_t b) { char bs[20]; #ifdef __LP64__ @@ -59,14 +61,14 @@ public: #endif return get_bl_ss(bl, a, bs); } - int put_bl_sn(bufferlist& bl, const char *a, version_t b) { + int put_bl_sn(bufferlist& bl, const char *a, version_t b, bool sync=true) { char bs[20]; #ifdef __LP64__ sprintf(bs, "%lu", b); #else sprintf(bs, "%llu", b); #endif - return put_bl_ss(bl, a, bs); + return put_bl_ss(bl, a, bs, sync); } int erase_ss(const char *a, const char *b); diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 6ca1c7a082196..baa67104ddcb3 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -142,13 +142,31 @@ void Paxos::handle_collect(MMonPaxos *collect) last->pn_from = accepted_pn_from; // and share whatever data we have - for (version_t v = collect->last_committed+1; - v <= last_committed; - v++) { - if (mon->store->exists_bl_sn(machine_name, v)) { - mon->store->get_bl_sn(last->values[v], machine_name, v); - dout(10) << " sharing " << v << " (" - << last->values[v].length() << " bytes)" << dendl; + if (collect->last_committed < last_committed) { + bufferlist bl; + version_t l = get_latest(bl); + assert(l <= last_committed); + + version_t v = collect->last_committed; + + // start with a stashed full copy? + /* hmm. + if (l > v + 10) { + last->latest_value.claim(bl); + last->latest_version = l; + v = l; + } + */ + + // include (remaining) incrementals + for (v++; + v <= last_committed; + v++) { + if (mon->store->exists_bl_sn(machine_name, v)) { + mon->store->get_bl_sn(last->values[v], machine_name, v); + dout(10) << " sharing " << v << " (" + << last->values[v].length() << " bytes)" << dendl; + } } } @@ -187,13 +205,21 @@ void Paxos::handle_last(MMonPaxos *last) // did we receive a committed value? if (last->last_committed > last_committed) { + /* hmm. + if (last->latest_version) { + last_committed = last->latest_value; + dout(10) << "stashing latest full value " << last_committed << dendl; + stash_latest(last_committed, last->latest_value); + } + */ for (version_t v = last_committed+1; v <= last->last_committed; v++) { - mon->store->put_bl_sn(last->values[v], machine_name, v); + mon->store->put_bl_sn(last->values[v], machine_name, v, false); dout(10) << "committing " << v << " " << last->values[v].length() << " bytes" << dendl; } + mon->store->sync(); last_committed = last->last_committed; mon->store->put_int(last_committed, machine_name, "last_committed"); dout(10) << "last_committed now " << last_committed << dendl; @@ -465,8 +491,9 @@ void Paxos::handle_commit(MMonPaxos *commit) assert(p->first == last_committed+1); last_committed = p->first; dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << dendl; - mon->store->put_bl_sn(p->second, machine_name, last_committed); + mon->store->put_bl_sn(p->second, machine_name, last_committed, false); } + mon->store->sync(); mon->store->put_int(last_committed, machine_name, "last_committed"); delete commit; @@ -878,7 +905,8 @@ version_t Paxos::get_latest(bufferlist& bl) return 0; } bufferlist::iterator p = full.begin(); - ::decode(latest_stashed, p); + version_t v; + ::decode(v, p); ::decode(bl, p); dout(10) << "get_latest v" << latest_stashed << " len " << bl.length() << dendl; return latest_stashed; -- 2.47.3