__s32 op; // paxos op
__s32 machine_id; // which state machine?
+ version_t first_committed; // i've committed to
version_t last_committed; // i've committed to
version_t pn_from; // i promise to accept after
version_t pn; // with with proposal
Message(MSG_MON_PAXOS),
epoch(e),
op(o), machine_id(mid),
- last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
+ first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
const char *get_type_name() { return "paxos"; }
void print(ostream& out) {
out << "paxos(" << get_paxos_name(machine_id)
- << " " << get_opname(op) << " lc " << last_committed
+ << " " << get_opname(op)
+ << " lc " << last_committed
+ << " fc " << first_committed
<< " pn " << pn << " opn " << uncommitted_pn
<< ")";
}
::encode(epoch, payload);
::encode(op, payload);
::encode(machine_id, payload);
+ ::encode(first_committed, payload);
::encode(last_committed, payload);
::encode(pn_from, payload);
::encode(pn, payload);
::decode(epoch, p);
::decode(op, p);
::decode(machine_id, p);
+ ::decode(first_committed, p);
::decode(last_committed, p);
::decode(pn_from, p);
::decode(pn, p);
epoch_t get_epoch() { return mon_epoch; }
int get_leader() { return leader; }
const set<int>& get_quorum() { return quorum; }
+ bool is_full_quorum() {
+ return quorum.size() == monmap->size();
+ }
void call_election(); // initiate election
void win_election(epoch_t epoch, set<int>& q); // end election (called by Elector)
return r == 0;
}
+int MonitorStore::erase_ss(const char *a, const char *b)
+{
+ char fn[200];
+ if (b) {
+ dout(15) << "erase_ss " << a << "/" << b << dendl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(15) << "erase_ss " << a << dendl;
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ }
+ return ::unlink(fn);
+}
int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
{
return put_bl_ss(bl, a, bs);
}
+ int erase_ss(const char *a, const char *b);
+ int erase_sn(const char *a, version_t b) {
+ char bs[20];
+#ifdef __LP64__
+ sprintf(bs, "%lu", b);
+#else
+ sprintf(bs, "%llu", b);
+#endif
+ return erase_ss(a, bs);
+ }
+
/*
version_t get_incarnation() { return get_int("incarnation"); }
void set_incarnation(version_t i) { set_int(i, "incarnation"); }
pg_map.encode(bl);
mon->store->put_bl_ss(bl, "pgmap", "latest");
+ if (mon->is_leader() &&
+ mon->is_full_quorum() &&
+ paxosv > 10)
+ paxos->trim_to(paxosv-10);
+
send_pg_creates();
return true;
last_pn = mon->store->get_int(machine_name, "last_pn");
accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
last_committed = mon->store->get_int(machine_name, "last_committed");
+ first_committed = mon->store->get_int(machine_name, "first_committed");
dout(10) << "init" << dendl;
}
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
lease->last_committed = last_committed;
lease->lease_expire = lease_expire;
+ if (mon->is_full_quorum())
+ lease->first_committed = first_committed;
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
}
delete lease;
return;
}
-
+
// extend lease
if (lease_expire < lease->lease_expire)
lease_expire = lease->lease_expire;
// ack
MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id);
ack->last_committed = last_committed;
+ ack->first_committed = first_committed;
ack->lease_expire = lease_expire;
mon->messenger->send_message(ack, lease->get_source_inst());
mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = new C_LeaseTimeout(this);
mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event);
+
+ // trim?
+ trim_to(lease->first_committed);
// kick waiters
finish_contexts(waiting_for_active);
}
+
+/*
+ * trim old states
+ */
+
+void Paxos::trim_to(version_t first)
+{
+ dout(10) << "trim_to " << first << " (was " << first_committed << ")" << dendl;
+
+ if (first_committed >= first)
+ return;
+
+ while (first_committed < first) {
+ dout(10) << "trim " << first_committed << dendl;
+ mon->store->erase_sn(machine_name, first_committed);
+ first_committed++;
+ }
+ mon->store->put_int(first_committed, machine_name, "first_committed");
+}
+
/*
* return a globally unique, monotonically increasing proposal number
*/
private:
// recovery (phase 1)
+ version_t first_committed_any;
+ version_t first_committed;
version_t last_pn;
version_t last_committed;
version_t accepted_pn;
void leader_init();
void peon_init();
-
// -- service interface --
void wait_for_active(Context *c) {
assert(!is_active());
waiting_for_active.push_back(c);
}
+
+ void trim_to(version_t first);
// read
version_t get_version() { return last_committed; }