Write them all, then sync once at the end.
Also include some infrastructure for using the latest stashed value
to recover. Don't use it yet, though. The interaction with
keeping last_committed and latest stashed values in sync wrt a
failure between the two is a bit tricky.
version_t uncommitted_pn; // previous pn, if we are a LAST with an uncommitted value
utime_t lease_expire;
+ version_t latest_version;
+ bufferlist latest_value;
+
map<version_t,bufferlist> values;
MMonPaxos() : Message(MSG_MON_PAXOS) {}
Message(MSG_MON_PAXOS),
epoch(e),
op(o), machine_id(mid),
- first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
+ first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0),
+ latest_version(0) { }
const char *get_type_name() { return "paxos"; }
<< " " << get_opname(op)
<< " lc " << last_committed
<< " fc " << first_committed
- << " pn " << pn << " opn " << uncommitted_pn
- << ")";
+ << " pn " << pn << " opn " << uncommitted_pn;
+ if (latest_version)
+ out << " latest " << latest_version << " (" << latest_value.length() << " bytes)";
+ out << ")";
}
void encode_payload() {
::encode(pn, payload);
::encode(uncommitted_pn, payload);
::encode(lease_expire, payload);
+ ::encode(latest_version, payload);
+ ::encode(latest_value, payload);
::encode(values, payload);
}
void decode_payload() {
::decode(pn, p);
::decode(uncommitted_pn, p);
::decode(lease_expire, p);
+ ::decode(latest_version, p);
+ ::decode(latest_value, p);
::decode(values, p);
}
};
return r;
}
+void MonitorStore::sync()
+{
+ dout(10) << "sync" << dendl;
+ ::sync();
+}
version_t MonitorStore::get_int(const char *a, const char *b)
{
}
-void MonitorStore::put_int(version_t val, const char *a, const char *b)
+void MonitorStore::put_int(version_t val, const char *a, const char *b, bool sync)
{
char fn[200];
sprintf(fn, "%s/%s", dir.c_str(), a);
int fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
assert(fd >= 0);
::write(fd, vs, strlen(vs));
+ if (sync)
+ ::fsync(fd);
::close(fd);
::rename(tfn, fn);
}
return len;
}
-int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
+int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync)
{
char fn[200];
sprintf(fn, "%s/%s", dir.c_str(), a);
derr(0) << "put_bl_ss ::write() errored out, errno is " << strerror(errno) << dendl;
}
- ::fsync(fd);
+ if (sync)
+ ::fsync(fd);
::close(fd);
::rename(tfn, fn);
int mount();
int umount();
+ void sync();
+
// ints (stored as ascii)
version_t get_int(const char *a, const char *b=0);
- void put_int(version_t v, const char *a, const char *b=0);
+ void put_int(version_t v, const char *a, const char *b=0, bool sync=true);
// buffers
// ss and sn varieties.
bool exists_bl_ss(const char *a, const char *b=0);
int get_bl_ss(bufferlist& bl, const char *a, const char *b);
- int put_bl_ss(bufferlist& bl, const char *a, const char *b);
+ int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true);
bool exists_bl_sn(const char *a, version_t b) {
char bs[20];
#ifdef __LP64__
#endif
return get_bl_ss(bl, a, bs);
}
- int put_bl_sn(bufferlist& bl, const char *a, version_t b) {
+ int put_bl_sn(bufferlist& bl, const char *a, version_t b, bool sync=true) {
char bs[20];
#ifdef __LP64__
sprintf(bs, "%lu", b);
#else
sprintf(bs, "%llu", b);
#endif
- return put_bl_ss(bl, a, bs);
+ return put_bl_ss(bl, a, bs, sync);
}
int erase_ss(const char *a, const char *b);
last->pn_from = accepted_pn_from;
// and share whatever data we have
- for (version_t v = collect->last_committed+1;
- v <= last_committed;
- v++) {
- if (mon->store->exists_bl_sn(machine_name, v)) {
- mon->store->get_bl_sn(last->values[v], machine_name, v);
- dout(10) << " sharing " << v << " ("
- << last->values[v].length() << " bytes)" << dendl;
+ if (collect->last_committed < last_committed) {
+ bufferlist bl;
+ version_t l = get_latest(bl);
+ assert(l <= last_committed);
+
+ version_t v = collect->last_committed;
+
+ // start with a stashed full copy?
+ /* hmm.
+ if (l > v + 10) {
+ last->latest_value.claim(bl);
+ last->latest_version = l;
+ v = l;
+ }
+ */
+
+ // include (remaining) incrementals
+ for (v++;
+ v <= last_committed;
+ v++) {
+ if (mon->store->exists_bl_sn(machine_name, v)) {
+ mon->store->get_bl_sn(last->values[v], machine_name, v);
+ dout(10) << " sharing " << v << " ("
+ << last->values[v].length() << " bytes)" << dendl;
+ }
}
}
// did we receive a committed value?
if (last->last_committed > last_committed) {
+ /* hmm.
+ if (last->latest_version) {
+ last_committed = last->latest_value;
+ dout(10) << "stashing latest full value " << last_committed << dendl;
+ stash_latest(last_committed, last->latest_value);
+ }
+ */
for (version_t v = last_committed+1;
v <= last->last_committed;
v++) {
- mon->store->put_bl_sn(last->values[v], machine_name, v);
+ mon->store->put_bl_sn(last->values[v], machine_name, v, false);
dout(10) << "committing " << v << " "
<< last->values[v].length() << " bytes" << dendl;
}
+ mon->store->sync();
last_committed = last->last_committed;
mon->store->put_int(last_committed, machine_name, "last_committed");
dout(10) << "last_committed now " << last_committed << dendl;
assert(p->first == last_committed+1);
last_committed = p->first;
dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << dendl;
- mon->store->put_bl_sn(p->second, machine_name, last_committed);
+ mon->store->put_bl_sn(p->second, machine_name, last_committed, false);
}
+ mon->store->sync();
mon->store->put_int(last_committed, machine_name, "last_committed");
delete commit;
return 0;
}
bufferlist::iterator p = full.begin();
- ::decode(latest_stashed, p);
+ version_t v;
+ ::decode(v, p);
::decode(bl, p);
dout(10) << "get_latest v" << latest_stashed << " len " << bl.length() << dendl;
return latest_stashed;