assert(is_consistent());
}
+void Paxos::init_logger()
+{
+ PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
+ pcb.add_u64_counter(l_paxos_start_leader, "start_leader");
+ pcb.add_u64_counter(l_paxos_start_peon, "start_peon");
+ pcb.add_u64_counter(l_paxos_restart, "restart");
+ pcb.add_u64_counter(l_paxos_refresh, "refresh");
+ pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency");
+ pcb.add_u64_counter(l_paxos_begin, "begin");
+ pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys");
+ pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes");
+ pcb.add_time_avg(l_paxos_begin_latency, "begin_latency");
+ pcb.add_u64_counter(l_paxos_commit, "commit");
+ pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys");
+ pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes");
+ pcb.add_time_avg(l_paxos_commit_latency, "commit_latency");
+ pcb.add_u64_counter(l_paxos_collect, "collect");
+ pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys");
+ pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes");
+ pcb.add_time_avg(l_paxos_collect_latency, "collect_latency");
+ pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted");
+ pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout");
+ pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout");
+ pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout");
+ pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout");
+ pcb.add_u64_counter(l_paxos_store_state, "store_state");
+ pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys");
+ pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes");
+ pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency");
+ pcb.add_u64_counter(l_paxos_share_state, "share_state");
+ pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys");
+ pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes");
+ pcb.add_u64_counter(l_paxos_new_pn, "new_pn");
+ pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency");
+ logger = pcb.create_perf_counters();
+ g_ceph_context->get_perfcounters_collection()->add(logger);
+}
+
void Paxos::dump_info(Formatter *f)
{
f->open_object_section("paxos");
<< " pn " << uncommitted_pn
<< " (" << uncommitted_value.length() << " bytes) from myself"
<< dendl;
+
+ logger->inc(l_paxos_collect_uncommitted);
}
// pick new pn
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_collect);
+ logger->inc(l_paxos_collect_keys, t.get_keys());
+ logger->inc(l_paxos_collect_bytes, t.get_bytes());
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_collect_latency, end - start);
} else {
// don't accept!
dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
<< " and crossing our fingers" << dendl;
last->uncommitted_pn = previous_pn;
}
+
+ logger->inc(l_paxos_collect_uncommitted);
}
// send reply
version_t v = peer_last_committed + 1;
// include incrementals
+ uint64_t bytes = 0;
for ( ; v <= last_committed; v++) {
if (get_store()->exists(get_name(), v)) {
get_store()->get(get_name(), v, m->values[v]);
assert(m->values[v].length());
dout(10) << " sharing " << v << " ("
<< m->values[v].length() << " bytes)" << dendl;
+ bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16
}
}
+ logger->inc(l_paxos_share_state);
+ logger->inc(l_paxos_share_state_keys, m->values.size());
+ logger->inc(l_paxos_share_state_bytes, bytes);
m->last_committed = last_committed;
}
dout(10) << "store_state [" << start->first << ".."
<< last_committed << "]" << dendl;
t.put(get_name(), "last_committed", last_committed);
+
// we should apply the state here -- decode every single bufferlist in the
// map and append the transactions to 't'.
map<version_t,bufferlist>::iterator it;
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_store_state);
+ logger->inc(l_paxos_store_state_bytes, t.get_bytes());
+ logger->inc(l_paxos_store_state_keys, t.get_keys());
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_store_state_latency, end - start);
+
// refresh first_committed; this txn may have trimmed.
first_committed = get_store()->get(get_name(), "first_committed");
{
dout(1) << "collect timeout, calling fresh election" << dendl;
collect_timeout_event = 0;
+ logger->inc(l_paxos_collect_timeout);
assert(mon->is_leader());
mon->bootstrap();
}
// and no value, yet.
assert(new_value.length() == 0);
-
+
// accept it ourselves
accepted.clear();
accepted.insert(mon->rank);
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_begin);
+ logger->inc(l_paxos_begin_keys, t.get_keys());
+ logger->inc(l_paxos_begin_bytes, t.get_bytes());
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_begin_latency, end - start);
+
assert(g_conf->paxos_kill_at != 3);
if (mon->get_quorum().size() == 1) {
assert(g_conf->paxos_kill_at != 4);
+ logger->inc(l_paxos_begin);
+
// set state.
state = STATE_UPDATING;
lease_expire = utime_t(); // cancel lease
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_begin_bytes, t.get_bytes());
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_begin_latency, end - start);
+
assert(g_conf->paxos_kill_at != 5);
// reply
accept_timeout_event = 0;
assert(mon->is_leader());
assert(is_updating() || is_updating_previous());
+ logger->inc(l_paxos_accept_timeout);
mon->bootstrap();
}
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_commit);
+ logger->inc(l_paxos_commit_keys, t.get_keys());
+ logger->inc(l_paxos_commit_bytes, t.get_bytes());
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_commit_latency, end - start);
+
assert(g_conf->paxos_kill_at != 8);
// refresh first_committed; this txn may have trimmed.
{
dout(10) << "handle_commit on " << commit->last_committed << dendl;
+ logger->inc(l_paxos_commit);
+
if (!mon->is_peon()) {
dout(10) << "not a peon, dropping" << dendl;
assert(0);
{
bool need_bootstrap = false;
+ utime_t start = ceph_clock_now(NULL);
+
// make sure we have the latest state loaded up
mon->refresh_from_paxos(&need_bootstrap);
+ utime_t end = ceph_clock_now(NULL);
+ logger->inc(l_paxos_refresh);
+ logger->tinc(l_paxos_refresh_latency, end - start);
+
if (need_bootstrap) {
dout(10) << " doing requested bootstrap" << dendl;
mon->bootstrap();
dout(1) << "lease_ack_timeout -- calling new election" << dendl;
assert(mon->is_leader());
assert(is_active());
-
+ logger->inc(l_paxos_lease_ack_timeout);
lease_ack_timeout_event = 0;
mon->bootstrap();
}
{
dout(1) << "lease_timeout -- calling new election" << dendl;
assert(mon->is_peon());
-
+ logger->inc(l_paxos_lease_timeout);
lease_timeout_event = 0;
mon->bootstrap();
}
f.flush(*_dout);
*_dout << dendl;
+ logger->inc(l_paxos_new_pn);
+ utime_t start = ceph_clock_now(NULL);
+
get_store()->apply_transaction(t);
+ utime_t end = ceph_clock_now(NULL);
+ logger->tinc(l_paxos_new_pn_latency, end - start);
+
dout(10) << "get_new_proposal_number = " << last_pn << dendl;
return last_pn;
}
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
finish_contexts(g_ceph_context, proposals, -ECANCELED);
+ if (logger)
+ g_ceph_context->get_perfcounters_collection()->remove(logger);
+ delete logger;
}
void Paxos::leader_init()
finish_contexts(g_ceph_context, proposals, -EAGAIN);
+ logger->inc(l_paxos_start_leader);
+
if (mon->get_quorum().size() == 1) {
state = STATE_ACTIVE;
return;
finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, proposals, -EAGAIN);
+
+ logger->inc(l_paxos_start_peon);
}
void Paxos::restart()
finish_contexts(g_ceph_context, proposals, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
+
+ logger->inc(l_paxos_restart);
}