#define dout_subsys ceph_subsys_paxos
#undef dout_prefix
#define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, machine_name, state, first_committed, last_committed)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name, int rank,
- const char *machine_name, int state,
- version_t first_committed, version_t last_committed) {
+static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
+ int rank, const char *machine_name, int state,
+ version_t first_committed, version_t last_committed)
+{
return *_dout << "mon." << name << "@" << rank
<< "(" << mon->get_state_name() << ")"
<< ".paxos(" << machine_name << " " << Paxos::get_statename(state)
++p) {
if (*p == mon->rank) continue;
- MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
+ machine_id, ceph_clock_now(g_ceph_context));
collect->last_committed = last_committed;
collect->first_committed = first_committed;
collect->pn = accepted_pn;
// ok, accept it
accepted_pn = collect->pn;
accepted_pn_from = collect->pn_from;
- dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << dendl;
+ dout(10) << "accepting pn " << accepted_pn << " from "
+ << accepted_pn_from << dendl;
mon->store->put_int(accepted_pn, machine_name, "accepted_pn");
} else {
// don't accept!
- dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
- << ", we already accepted " << accepted_pn << " from " << accepted_pn_from
- << dendl;
+ dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
+ << ", we already accepted " << accepted_pn
+ << " from " << accepted_pn_from << dendl;
}
last->pn = accepted_pn;
last->pn_from = accepted_pn_from;
if (mon->store->exists_bl_sn(machine_name, last_committed+1)) {
mon->store->get_bl_sn(bl, machine_name, last_committed+1);
assert(bl.length() > 0);
- dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1
- << " (" << bl.length() << " bytes)" << dendl;
+ dout(10) << " sharing our accepted but uncommitted value for "
+ << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
last->values[last_committed+1] = bl;
last->uncommitted_pn = accepted_pn;
}
collect->put();
}
-void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed)
+void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
+ version_t peer_last_committed)
{
assert(peer_last_committed < last_committed);
- dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl;
+ dout(10) << "share_state peer has fc " << peer_first_committed
+ << " lc " << peer_last_committed << dendl;
version_t v = peer_last_committed + 1;
// start with a stashed full copy?
bufferlist bl;
version_t l = get_stashed(bl);
assert(l <= last_committed);
- dout(10) << "share_state starting with latest " << l << " (" << bl.length() << " bytes)" << dendl;
+ dout(10) << "share_state starting with latest " << l
+ << " (" << bl.length() << " bytes)" << dendl;
m->latest_value.claim(bl);
m->latest_version = l;
v = l;
}
// include incrementals
- for ( ;
- v <= last_committed;
- v++) {
+ for ( ; v <= last_committed; v++) {
if (mon->store->exists_bl_sn(machine_name, v)) {
mon->store->get_bl_sn(m->values[v], machine_name, v);
dout(10) << " sharing " << v << " ("
// stash?
if (m->latest_version && m->latest_version > last_committed) {
- dout(10) << "store_state got stash version " << m->latest_version << ", zapping old states" << dendl;
+ dout(10) << "store_state got stash version "
+ << m->latest_version << ", zapping old states" << dendl;
assert(start != m->values.end() && start->first == m->latest_version);
start = m->values.end();
}
- while (start != m->values.end() &&
- start->first <= last_committed)
+ while (start != m->values.end() && start->first <= last_committed) {
++start;
+ }
map<version_t,bufferlist>::iterator end = start;
- while (end != m->values.end() &&
- end->first <= m->last_committed) {
+ while (end != m->values.end() && end->first <= m->last_committed) {
last_committed = end->first;
if (!first_committed)
first_committed = last_committed;
if (start == end) {
dout(10) << "store_state nothing to commit" << dendl;
} else {
- dout(10) << "store_state [" << start->first << ".." << last_committed << "]" << dendl;
+ dout(10) << "store_state [" << start->first << ".."
+ << last_committed << "]" << dendl;
+
mon->store->put_bl_sn_map(machine_name, start, end);
mon->store->put_int(last_committed, machine_name, "last_committed");
mon->store->put_int(first_committed, machine_name, "first_committed");
if (p->second < last_committed) {
// share committed values
dout(10) << " sending commit to mon." << p->first << dendl;
- MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id,
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
+ MMonPaxos::OP_COMMIT, machine_id,
ceph_clock_now(g_ceph_context));
share_state(commit, peer_first_committed[p->first], p->second);
mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
if (*p == mon->rank) continue;
dout(10) << " sending begin to mon." << *p << dendl;
- MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
+ machine_id, ceph_clock_now(g_ceph_context));
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
mon->store->put_bl_sn(begin->values[v], machine_name, v);
// reply
- MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
+ machine_id, ceph_clock_now(g_ceph_context));
accept->pn = accepted_pn;
accept->last_committed = last_committed;
mon->messenger->send_message(accept, begin->get_source_inst());
if (*p == mon->rank) continue;
dout(10) << " sending commit to mon." << *p << dendl;
- MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
+ machine_id, ceph_clock_now(g_ceph_context));
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
commit->last_committed = last_committed;
}
-
-
void Paxos::handle_commit(MMonPaxos *commit)
{
dout(10) << "handle_commit on " << commit->last_committed << dendl;
acked_lease.clear();
acked_lease.insert(mon->rank);
- dout(7) << "extend_lease now+" << g_conf->mon_lease << " (" << lease_expire << ")" << dendl;
+ dout(7) << "extend_lease now+" << g_conf->mon_lease
+ << " (" << lease_expire << ")" << dendl;
// bcast
for (set<int>::const_iterator p = mon->get_quorum().begin();
- p != mon->get_quorum().end();
- ++p) {
+ p != mon->get_quorum().end(); ++p) {
+
if (*p == mon->rank) continue;
- MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
+ machine_id, ceph_clock_now(g_ceph_context));
lease->last_committed = last_committed;
lease->lease_timestamp = lease_expire;
lease->first_committed = first_committed;
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
lease_ack_timeout_event = new C_LeaseAckTimeout(this);
- mon->timer.add_event_after(g_conf->mon_lease_ack_timeout, lease_ack_timeout_event);
+ mon->timer.add_event_after(g_conf->mon_lease_ack_timeout,
+ lease_ack_timeout_event);
}
// set renew event
// sanity
if (!mon->is_peon() ||
last_committed != lease->last_committed) {
- dout(10) << "handle_lease i'm not a peon, or they're not the leader, or the last_committed doesn't match, dropping" << dendl;
+ dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
+ << " or the last_committed doesn't match, dropping" << dendl;
lease->put();
return;
}
<< " now " << lease_expire << dendl;
// ack
- MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id,
- ceph_clock_now(g_ceph_context));
+ MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
+ machine_id, ceph_clock_now(g_ceph_context));
ack->last_committed = last_committed;
ack->first_committed = first_committed;
ack->lease_timestamp = ceph_clock_now(g_ceph_context);
int from = ack->get_source().num();
if (!lease_ack_timeout_event) {
- dout(10) << "handle_lease_ack from " << ack->get_source() << " -- stray (probably since revoked)" << dendl;
+ dout(10) << "handle_lease_ack from " << ack->get_source()
+ << " -- stray (probably since revoked)" << dendl;
}
else if (acked_lease.count(from) == 0) {
acked_lease.insert(from);
}
-
/*
* trim old states
*/