#include "ConnectionTracker.h"
#include "common/Formatter.h"
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, rank, epoch, version)
+
+static std::ostream& _prefix(std::ostream *_dout, int rank, epoch_t epoch, uint64_t version) {
+ return *_dout << "rank: " << rank << " version: "<< version << " ConnectionTracker(" << epoch << ") ";
+}
std::ostream& operator<<(std::ostream&o, const ConnectionReport& c) {
o << "rank=" << c.rank << ",epoch=" << c.epoch << ",version=" << c.epoch_version
void ConnectionTracker::receive_peer_report(const ConnectionTracker& o)
{
+ ldout(cct, 30) << __func__ << dendl;
for (auto& i : o.peer_reports) {
const ConnectionReport& report = i.second;
if (report.rank == rank) continue;
if (report.epoch > existing.epoch ||
(report.epoch == existing.epoch &&
report.epoch_version > existing.epoch_version)) {
+ ldout(cct, 30) << " new peer_report is more updated" << dendl;
+ ldout(cct, 30) << "existing: " << existing << dendl;
+ ldout(cct, 30) << "new: " << report << dendl;
existing = report;
}
}
bool ConnectionTracker::increase_epoch(epoch_t e)
{
+ ldout(cct, 30) << __func__ << " to " << e << dendl;
if (e > epoch) {
my_reports.epoch_version = version = 0;
my_reports.epoch = epoch = e;
void ConnectionTracker::increase_version()
{
+ ldout(cct, 30) << __func__ << " to " << version+1 << dendl;
encoding.clear();
++version;
my_reports.epoch_version = version;
peer_reports[rank] = my_reports;
if ((version % persist_interval) == 0 ) {
+ ldout(cct, 30) << version << " % " << persist_interval << " == 0" << dendl;
owner->persist_connectivity_scores();
}
}
void ConnectionTracker::report_live_connection(int peer_rank, double units_alive)
{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_alive: " << units_alive << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
// we need to "auto-initialize" to 1, do shenanigans
auto i = my_reports.history.find(peer_rank);
if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
i = j;
}
double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
pscore = pscore * (1 - units_alive / (2 * half_life)) +
(units_alive / (2 * half_life));
pscore = std::min(pscore, 1.0);
my_reports.current[peer_rank] = true;
increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
}
void ConnectionTracker::report_dead_connection(int peer_rank, double units_dead)
{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_dead: " << units_dead << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
// we need to "auto-initialize" to 1, do shenanigans
auto i = my_reports.history.find(peer_rank);
if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
i = j;
}
double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
pscore = pscore * (1 - units_dead / (2 * half_life)) -
(units_dead / (2*half_life));
pscore = std::max(pscore, 0.0);
my_reports.current[peer_rank] = false;
increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
}
void ConnectionTracker::get_total_connection_score(int peer_rank, double *rating,
int *live_count) const
{
+ ldout(cct, 30) << __func__ << dendl;
*rating = 0;
*live_count = 0;
double rate = 0;
*live_count = live;
}
+void ConnectionTracker::notify_rank_changed(int new_rank)
+{
+ ldout(cct, 20) << __func__ << " to " << new_rank << dendl;
+ if (new_rank == rank) return;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ peer_reports.erase(rank);
+ peer_reports.erase(new_rank);
+ my_reports.rank = new_rank;
+ rank = new_rank;
+ encoding.clear();
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+}
+
void ConnectionTracker::notify_rank_removed(int rank_removed)
{
+
+ ldout(cct, 20) << __func__ << " " << rank_removed << dendl;
+
+ // No point removing something that doesn't exist!
+ if (!peer_reports.count(rank_removed)) return;
+
+ ldout(cct, 20) << "my_reports before: " << my_reports << dendl;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ ldout(cct, 20) << "my rank before: " << rank << dendl;
+
encoding.clear();
size_t starting_size = my_reports.current.size();
// erase the removed rank from everywhere
--rank;
my_reports.rank = rank;
}
+
+ ldout(cct, 20) << "my rank after: " << rank << dendl;
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+ ldout(cct, 20) << "my_reports after: " << my_reports << dendl;
+
+ //check if the new_rank from monmap is equal to our adjusted rank.
+ ceph_assert(rank == new_rank);
}
void ConnectionTracker::encode(bufferlist &bl) const
int rank;
int persist_interval;
bufferlist encoding;
+ CephContext *cct;
int get_my_rank() const { return rank; }
ConnectionReport *reports(int p);
const ConnectionReport *reports(int p) const;
owner(NULL), rank(-1), persist_interval(10) {
}
ConnectionTracker(RankProvider *o, int rank, double hl,
- int persist_i) :
+ int persist_i, CephContext *c) :
epoch(0), version(0),
- half_life(hl), owner(o), rank(rank), persist_interval(persist_i) {
+ half_life(hl), owner(o), rank(rank), persist_interval(persist_i), cct(c) {
my_reports.rank = rank;
}
- ConnectionTracker(const bufferlist& bl) :
+ ConnectionTracker(const bufferlist& bl, CephContext *c) :
epoch(0), version(0),
- half_life(0), owner(NULL), rank(-1)
+ half_life(0), owner(NULL), rank(-1), persist_interval(10), cct(c)
{
auto bi = bl.cbegin();
decode(bi);
ConnectionTracker(const ConnectionTracker& o) :
epoch(o.epoch), version(o.version),
half_life(o.half_life), owner(o.owner), rank(o.rank),
- persist_interval(o.persist_interval)
+ persist_interval(o.persist_interval), cct(o.cct)
{
peer_reports = o.peer_reports;
my_reports = o.my_reports;
}
void notify_reset() { clear_peer_reports(); }
- void notify_rank_changed(int new_rank) {
- if (new_rank == rank) return;
- peer_reports.erase(rank);
- peer_reports.erase(new_rank);
- my_reports.rank = new_rank;
- rank = new_rank;
- encoding.clear();
- }
+ void notify_rank_changed(int new_rank);
void notify_rank_removed(int rank_removed);
friend std::ostream& operator<<(std::ostream& o, const ConnectionTracker& c);
friend ConnectionReport *get_connection_reports(ConnectionTracker& ct);
void ElectionLogic::bump_epoch(epoch_t e)
{
- ldout(cct, 10) << __func__ << epoch << " to " << e << dendl;
+ ldout(cct, 10) << __func__ << " to " << e << dendl;
ceph_assert(epoch <= e);
epoch = e;
peer_tracker->increase_epoch(e);
void ElectionLogic::connectivity_bump_epoch_in_election(epoch_t mepoch)
{
+ ldout(cct, 30) << __func__ << " to " << mepoch << dendl;
ceph_assert(mepoch > epoch);
bump_epoch(mepoch);
reset_stable_tracker();
void ElectionLogic::receive_propose(int from, epoch_t mepoch,
const ConnectionTracker *ct)
{
+ ldout(cct, 20) << __func__ << " from " << from << dendl;
if (from == elector->get_my_rank()) {
lderr(cct) << "I got a propose from my own rank, hopefully this is startup weirdness,dropping" << dendl;
return;
double ElectionLogic::connectivity_election_score(int rank)
{
+ ldout(cct, 30) << __func__ << " of " << rank << dendl;
if (elector->get_disallowed_leaders().count(rank)) {
return -1;
}
double score;
int liveness;
if (stable_peer_tracker) {
+ ldout(cct, 30) << "stable_peer_tracker exists so using that ..." << dendl;
stable_peer_tracker->get_total_connection_score(rank, &score, &liveness);
} else {
+ ldout(cct, 30) << "stable_peer_tracker does not exists, using peer_tracker ..." << dendl;
peer_tracker->get_total_connection_score(rank, &score, &liveness);
}
return score;
void ElectionLogic::propose_connectivity_handler(int from, epoch_t mepoch,
const ConnectionTracker *ct)
{
+ ldout(cct, 10) << __func__ << " from " << from << " mepoch: "
+ << mepoch << " epoch: " << epoch << dendl;
+ ldout(cct, 30) << "last_election_winner: " << last_election_winner << dendl;
if ((epoch % 2 == 0) &&
last_election_winner != elector->get_my_rank() &&
!elector->is_current_member(from)) {
// To prevent election flapping, peons ignore proposals from out-of-quorum
// peers unless their vote would materially change from the last election
+ ldout(cct, 30) << "Lets see if this out-of-quorum peer is worth it " << dendl;
int best_scorer = 0;
double best_score = 0;
double last_voted_for_score = 0;
+ ldout(cct, 30) << "elector->paxos_size(): " << elector->paxos_size() << dendl;
for (unsigned i = 0; i < elector->paxos_size(); ++i) {
double score = connectivity_election_score(i);
if (score > best_score) {
last_voted_for_score = score;
}
}
+ ldout(cct, 30) << "best_scorer: " << best_scorer << " best_score: " << best_score
+ << " last_voted_for: " << last_voted_for << " last_voted_for_score: "
+ << last_voted_for_score << dendl;
if (best_scorer == last_voted_for ||
(best_score - last_voted_for_score < ignore_propose_margin)) {
// drop this message; it won't change our vote so we defer to leader
+ ldout(cct, 30) << "drop this message; it won't change our vote so we defer to leader " << dendl;
return;
}
}
if (mepoch > epoch) {
+ ldout(cct, 20) << "mepoch > epoch" << dendl;
connectivity_bump_epoch_in_election(mepoch);
} else if (mepoch < epoch) {
// got an "old" propose,
// a mon just started up, call a new election so they can rejoin!
ldout(cct, 5) << " got propose from old epoch, "
<< from << " must have just started" << dendl;
+ ldout(cct, 10) << "triggering new election" << dendl;
// we may be active; make sure we reset things in the monitor appropriately.
elector->trigger_new_election();
} else {
leader_score = connectivity_election_score(leader_acked);
}
- ldout(cct, 30) << "propose from rank=" << from << ", tracker: "
+ ldout(cct, 20) << "propose from rank=" << from << ", tracker: "
<< (stable_peer_tracker ? *stable_peer_tracker : *peer_tracker) << dendl;
- ldout(cct, 10) << "propose from rank=" << from << ",score=" << from_score
+ ldout(cct, 10) << "propose from rank=" << from << ",from_score=" << from_score
<< "; my score=" << my_score
<< "; currently acked " << leader_acked
- << ",score=" << leader_score << dendl;
+ << ",leader_score=" << leader_score << dendl;
bool my_win = (my_score >= 0) && // My score is non-zero; I am allowed to lead
((my_rank < from && my_score >= from_score) || // We have same scores and I have lower rank, or
(from_score > leader_score));
if (my_win) {
+ ldout(cct, 10) << " conditionally I win" << dendl;
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
ceph_assert(leader_score >= from_score); // and they still win, of course
} else {
// wait, i should win!
if (!electing_me) {
+ ldout(cct, 10) << " wait, i should win! triggering new election ..." << dendl;
elector->trigger_new_election();
}
}
} else {
+ ldout(cct, 10) << " conditionally they win" << dendl;
// they would win over me
if (their_win || from == leader_acked) {
if (leader_acked >= 0 && from != leader_acked) {
// we have to make sure our acked leader will ALSO defer to them, or else
// we can't, to maintain guarantees!
+ ldout(cct, 10) << " make sure acked leader defer to: " << from << dendl;
double leader_from_score;
int leader_from_liveness;
leader_peer_tracker->
&leader_leader_liveness);
if ((from < leader_acked && leader_from_score >= leader_leader_score) ||
(leader_from_score > leader_leader_score)) {
+ ldout(cct, 10) << "defering to " << from << dendl;
defer(from);
leader_peer_tracker.reset(new ConnectionTracker(*ct));
} else { // we can't defer to them *this* round even though they should win...
}
}
} else {
+ ldout(cct, 10) << "defering to " << from << dendl;
defer(from);
leader_peer_tracker.reset(new ConnectionTracker(*ct));
}
m->cct),
peer_tracker(this, m->rank,
m->cct->_conf.get_val<uint64_t>("mon_con_tracker_score_halflife"),
- m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval")),
+ m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval"), m->cct),
ping_timeout(m->cct->_conf.get_val<double>("mon_elector_ping_timeout")),
PING_DIVISOR(m->cct->_conf.get_val<uint64_t>("mon_elector_ping_divisor")),
mon(m), elector(this) {
void Elector::persist_connectivity_scores()
{
+ dout(20) << __func__ << dendl;
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
mon->store->apply_transaction(t);
void Elector::assimilate_connection_reports(const bufferlist& tbl)
{
- ConnectionTracker pct(tbl);
+ dout(10) << __func__ << dendl;
+ ConnectionTracker pct(tbl, mon->cct);
peer_tracker.receive_peer_report(pct);
}
}
ConnectionTracker *oct = NULL;
if (m->sharing_bl.length()) {
- oct = new ConnectionTracker(m->sharing_bl);
+ oct = new ConnectionTracker(m->sharing_bl, mon->cct);
}
logic.receive_propose(from, m->epoch, oct);
delete oct;
void Elector::begin_peer_ping(int peer)
{
+ dout(20) << __func__ << " against " << peer << dendl;
if (live_pinging.count(peer)) {
+ dout(20) << peer << " already in live_pinging ... return " << dendl;
return;
}
return;
}
- dout(5) << __func__ << " against " << peer << dendl;
-
peer_tracker.report_live_connection(peer, 0); // init this peer as existing
live_pinging.insert(peer);
dead_pinging.erase(peer);
void Elector::handle_ping(MonOpRequestRef op)
{
MMonPing *m = static_cast<MMonPing*>(op->get_req());
- dout(10) << __func__ << " " << *m << dendl;
-
int prank = mon->monmap->get_rank(m->get_source_addr());
+ dout(20) << __func__ << " from: " << prank << dendl;
begin_peer_ping(prank);
assimilate_connection_reports(m->tracker_bl);
switch(m->op) {
break;
case MMonPing::PING_REPLY:
+
const utime_t& previous_acked = peer_acked_ping[prank];
const utime_t& newest = peer_sent_ping[prank];
+
if (m->stamp > newest && !newest.is_zero()) {
derr << "dropping PING_REPLY stamp " << m->stamp
<< " as it is newer than newest sent " << newest << dendl;
return;
}
+
if (m->stamp > previous_acked) {
+ dout(20) << "m->stamp > previous_acked" << dendl;
peer_tracker.report_live_connection(prank, m->stamp - previous_acked);
peer_acked_ping[prank] = m->stamp;
+ } else{
+ dout(20) << "m->stamp <= previous_acked .. we don't report_live_connection" << dendl;
}
utime_t now = ceph_clock_now();
+ dout(30) << "now: " << now << " m->stamp: " << m->stamp << " ping_timeout: "
+ << ping_timeout << " PING_DIVISOR: " << PING_DIVISOR << dendl;
if (now - m->stamp > ping_timeout / PING_DIVISOR) {
if (!send_peer_ping(prank, &now)) return;
}
}
auto em = op->get_req<MMonElection>();
-
// assume an old message encoding would have matched
if (em->fsid != mon->monmap->fsid) {
dout(0) << " ignoring election msg fsid "
void Elector::notify_clear_peer_state()
{
+ dout(10) << __func__ << dendl;
peer_tracker.notify_reset();
}
void Elector::notify_rank_changed(int new_rank)
{
+ dout(10) << __func__ << " to " << new_rank << dendl;
peer_tracker.notify_rank_changed(new_rank);
live_pinging.erase(new_rank);
dead_pinging.erase(new_rank);
void Elector::notify_rank_removed(int rank_removed)
{
+ dout(10) << __func__ << ": " << rank_removed << dendl;
peer_tracker.notify_rank_removed(rank_removed);
/* we have to clean up the pinging state, which is annoying
because it's not indexed anywhere (and adding indexing
Owner(int r, ElectionLogic::election_strategy es, double tracker_halflife,
Election *p) : parent(p), rank(r), persisted_epoch(0),
ever_joined(false),
- peer_tracker(this, rank, tracker_halflife, 5),
+ peer_tracker(this, rank, tracker_halflife, 5, g_ceph_context),
logic(this, es, &peer_tracker, 0.0005, g_ceph_context),
victory_accepters(0),
timer_steps(-1), timer_election(true) {