default:
assert(0);
- m->put();
return true;
}
}
return prep_auth(op, true);
default:
assert(0);
- m->put();
return false;
}
}
MonSession *s = (MonSession *)m->get_connection()->get_priv();
if (!s) {
dout(10) << "no session, dropping" << dendl;
- m->put();
return true;
}
reply:
reply = new MAuthReply(proto, &response_bl, ret, s->global_id);
mon->send_reply(m, reply);
- m->put();
done:
s->put();
return true;
dout(10) << "AuthMonitor::prepare_global_id" << dendl;
increase_max_global_id();
- //m->put();
return true;
}
dout(10) << __func__ << " " << *m << dendl;
if (!in_quorum()) {
dout(1) << __func__ << " not in quorum -- ignore message" << dendl;
- m->put();
return false;
}
if (!cmd->get_source().is_mon()) {
string rs = ss.str();
mon->reply_command(cmd, ret, rs, rdata, 0);
- } else {
- cmd->put();
}
return (ret == 0);
assert(m->get_service_type() == get_type());
if (!in_quorum()) {
dout(1) << __func__ << " not in quorum -- drop message" << dendl;
- m->put();
return false;
}
assert(0 == "Unknown service op");
break;
}
- m->put();
return true;
}
mon->start_election();
} else {
dout(5) << " ignoring old propose" << dendl;
- m->put();
return;
}
}
dout(5) << "no, we already acked " << leader_acked << dendl;
}
}
-
- m->put();
}
void Elector::handle_ack(MonOpRequestRef op)
dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl;
bump_epoch(m->epoch);
start();
- m->put();
return;
}
assert(m->epoch == epoch);
required_features) {
dout(5) << " ignoring ack from mon" << from
<< " without required features" << dendl;
- m->put();
return;
}
// ignore, i'm deferring already.
assert(leader_acked >= 0);
}
-
- m->put();
}
dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl;
bump_epoch(m->epoch);
start();
- m->put();
return;
}
mon->get_classic_monitor_commands(&new_cmds, &cmdsize);
mon->set_leader_supported_commands(new_cmds, cmdsize);
}
-
- m->put();
}
void Elector::nak_old_peer(MonOpRequestRef op)
mon->features.encode(reply->sharing_bl);
m->get_connection()->send_message(reply);
}
- m->put();
}
void Elector::handle_nak(MonOpRequestRef op)
case MSG_MON_ELECTION:
{
if (!participating) {
- m->put();
return;
}
if (op->get_req()->get_source().num() >= mon->monmap->size()) {
dout(5) << " ignoring bogus election message with bad mon rank "
- m->put();
<< op->get_req()->get_source() << dendl;
return;
}
if (em->fsid != mon->monmap->fsid) {
dout(0) << " ignoring election msg fsid "
<< em->fsid << " != " << mon->monmap->fsid << dendl;
- m->put();
return;
}
if (!mon->monmap->contains(em->get_source_addr())) {
dout(1) << "discarding election message: " << em->get_source_addr()
<< " not in my monmap " << *mon->monmap << dendl;
- m->put();
return;
}
//mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
cancel_timer();
mon->bootstrap();
- m->put();
delete peermap;
return;
}
if (em->epoch < epoch) {
dout(5) << "old epoch, dropping" << dendl;
- em->put();
break;
}
if (services.count(service_type) == 0) {
dout(1) << __func__ << " service type " << service_type
<< " not registered -- drop message!" << dendl;
- //m->put();
return false;
}
return services[service_type]->service_dispatch(op);
default:
assert(0);
- m->put();
return true;
}
}
return prepare_log(op);
default:
assert(0);
- m->put();
return false;
}
}
return false;
done:
- m->put();
return true;
}
if (m->fsid != mon->monmap->fsid) {
dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid
<< dendl;
- m->put();
return false;
}
MLog *m = static_cast<MLog*>(op->get_req());
dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
mon->send_reply(m, new MLogAck(m->fsid, m->entries.rbegin()->seq));
-
- m->put();
}
bool LogMonitor::should_propose(double& delay)
default:
assert(0);
- m->put();
return true;
}
}
if (state != MDSMap::STATE_BOOT) {
dout(7) << "mds_beacon " << *m << " is not in mdsmap" << dendl;
mon->send_reply(m, new MMDSMap(mon->monmap->fsid, &mdsmap));
- m->put();
return true;
} else {
return false; // not booted yet.
mon->send_reply(m,
new MMDSBeacon(mon->monmap->fsid, m->get_global_id(), m->get_name(),
mdsmap.get_epoch(), state, seq));
- m->put();
return true;
ignore:
// I won't reply this beacon, drop it.
mon->no_reply(m);
- m->put();
return true;
}
return false;
done:
- m->put();
return true;
}
default:
assert(0);
- m->put();
}
return true;
info.state = MDSMap::STATE_STANDBY_REPLAY;
info.state_seq = seq;
} else {
- m->put();
return false;
}
} else if (m->get_standby_for_rank() >= 0 &&
info.standby_for_rank = m->get_standby_for_rank();
} else { //it's a standby for anybody, and is already in the list
assert(pending_mdsmap.get_mds_info().count(info.global_id));
- m->put();
return false;
}
} else if (state == MDSMap::STATE_DAMAGED) {
} else {
dout(10) << "prepare_offload_targets " << gid << " not in map" << dendl;
}
- m->put();
return true;
}
m->get_state(),
m->get_seq()));
}
- m->put();
}
void MDSMonitor::on_active()
mm->_updated(op); // success
else if (r == -ECANCELED) {
mm->mon->no_reply(op->get_req<PaxosServiceMessage>());
-// m->put();
} else {
mm->dispatch(op); // try again
}
dout(0) << __func__ << " unknown op " << m->op << dendl;
assert(0 == "unknown op");
}
- m->put();
}
// leader
if (m->fsid != monmap->fsid) {
dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
- m->put();
return;
}
<< ", missing " << (required_features & ~CEPH_FEATURES_ALL)
<< dendl;
break;
-
- default:
- m->put();
}
}
}
out:
- m->put();
+ return;
}
void Monitor::handle_probe_reply(MonOpRequestRef op)
// discover name and addrs during probing or electing states.
if (!is_probing() && !is_electing()) {
- m->put();
return;
}
<< ", mine was " << monmap->get_epoch() << dendl;
delete newmap;
monmap->decode(m->monmap_bl);
- m->put();
bootstrap();
return;
monmap->rename(peer_name, m->name);
if (is_electing()) {
- m->put();
bootstrap();
return;
}
monmap->get_addr(m->name).is_blank_ip()) {
dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
monmap->set_addr(m->name, m->get_source_addr());
- m->put();
bootstrap();
return;
// end discover phase
if (!is_probing()) {
- m->put();
return;
}
if (is_synchronizing()) {
dout(10) << " currently syncing" << dendl;
- m->put();
return;
}
<< dendl;
cancel_probe_timeout();
sync_start(other, true);
- m->put();
return;
}
if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
<< dendl;
cancel_probe_timeout();
sync_start(other, false);
- m->put();
return;
}
}
outside_quorum.insert(m->name);
} else {
dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
- m->put();
return;
}
dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
}
}
- m->put();
}
void Monitor::join_election()
rs = ss.str();
if (!m->get_source().is_mon()) // don't reply to mon->mon commands
reply_command(m, r, rs, 0);
- else
- m->put();
return;
}
out:
if (!m->get_source().is_mon()) // don't reply to mon->mon commands
reply_command(m, r, rs, rdata, 0);
- else
- m->put();
}
void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, version_t version)
reply->set_tid(m->get_tid());
reply->set_data(rdata);
send_reply(m, reply);
- m->put();
}
s->put();
}
session->put();
- m->put();
}
void Monitor::try_send_message(Message *m, const entity_inst_t& to)
dout(0) << "MRoute received from entity without appropriate perms! "
<< dendl;
session->put();
- m->put();
return;
}
if (m->msg)
m->msg = NULL;
}
}
- m->put();
if (session)
session->put();
}
} else {
dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
con->mark_down();
- m->put();
}
}
void Monitor::_ms_dispatch(Message *m)
{
if (is_shutdown()) {
- m->put();
return;
}
} else {
dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
}
- op->set_session(s);
assert(s);
if (s->auth_handler) {
never sent by clients to us. */
case MSG_LOGACK:
log_client.handle_log_ack((MLogAck*)op->get_req());
- //m->put();
break;
// monmap
if (!op->is_src_mon() ||
!op->get_session()->is_capable("mon", MON_CAP_X)) {
//can't send these!
- pm->put();
break;
}
// good, thus just drop them and ignore them.
dout(10) << __func__ << " ignore paxos msg from "
<< pm->get_source_inst() << dendl;
- pm->put();
break;
}
// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
- pm->put();
break;
}
if (pm->epoch != get_epoch()) {
- pm->put();
break;
}
!op->get_session()->is_capable("mon", MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
<< op->get_session()->caps << dendl;
- //m->put();
break;
}
if (!is_probing() && !is_synchronizing()) {
return;
drop:
- //m->put();
return;
}
reply->set_payload(payload);
dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
messenger->send_message(reply, inst);
- m->put();
}
void Monitor::timecheck_start()
} else {
dout(1) << __func__ << " drop unexpected msg" << dendl;
}
- m->put();
}
void Monitor::handle_subscribe(MonOpRequestRef op)
MonSession *s = static_cast<MonSession *>(m->get_connection()->get_priv());
if (!s) {
dout(10) << " no session, dropping" << dendl;
- m->put();
return;
}
m->get_connection()->send_message(new MMonSubscribeAck(monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
s->put();
- m->put();
}
void Monitor::handle_get_version(MonOpRequestRef op)
MonSession *s = static_cast<MonSession *>(m->get_connection()->get_priv());
if (!s) {
dout(10) << " no session, dropping" << dendl;
- m->put();
return;
}
m->get_connection()->send_message(reply);
}
- m->put();
out:
s->put();
MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
dout(10) << "handle_mon_get_map" << dendl;
send_latest_monmap(m->get_connection().get());
- m->put();
}
void Monitor::handle_mon_metadata(MonOpRequestRef op)
dout(10) << __func__ << dendl;
update_mon_metadata(m->get_source().num(), m->data);
}
- m->put();
}
void Monitor::update_mon_metadata(int from, const Metadata& m)
}
break;
}
- m->put();
}
bool Monitor::_scrub(ScrubResult *r,
mon->reply_command(m, rc, rs, rdata, version);
}
else if (r == -ECANCELED)
- m->put();
+ return;
else if (r == -EAGAIN)
mon->dispatch_op(op);
else
class C_RetryMessage : public Context {
Monitor *mon;
MonOpRequestRef op;
- Message *msg;
public:
C_RetryMessage(Monitor *m, MonOpRequestRef o) : mon(m), op(o) {}
void finish(int r) {
return preprocess_join(op);
default:
assert(0);
- m->put();
return true;
}
}
return prepare_join(op);
default:
assert(0);
- m->put();
}
return false;
if (!session ||
!session->is_capable("mon", MON_CAP_W | MON_CAP_X)) {
dout(10) << " insufficient caps" << dendl;
- join->put();
return true;
}
if (pending_map.contains(join->name) && !pending_map.get_addr(join->name).is_blank_ip()) {
dout(10) << " already have " << join->name << dendl;
- join->put();
return true;
}
if (pending_map.contains(join->addr) && pending_map.get_name(join->addr) == join->name) {
dout(10) << " already have " << join->addr << dendl;
- join->put();
return true;
}
return false;
pending_map.remove(pending_map.get_name(join->addr));
pending_map.add(join->name, join->addr);
pending_map.last_changed = ceph_clock_now(g_ceph_context);
- join->put();
return true;
}
default:
assert(0);
- m->put();
return true;
}
}
default:
assert(0);
- m->put();
}
return false;
reply->oldest_map = get_first_committed();
reply->newest_map = osdmap.get_epoch();
mon->send_reply(m, reply);
- m->put();
return true;
}
return false;
didit:
- m->put();
return true;
}
false)); // ACK itself does not request an ack
}
~C_AckMarkedDown() {
- m->put();
}
};
dout(10) << " no failure_info for osd." << target_osd << dendl;
}
mon->no_reply(m);
- m->put();
}
return false;
return false;
ignore:
- m->put();
return true;
}
// does this osd exist?
if (from >= osdmap.get_max_osd()) {
dout(1) << "boot from osd." << from << " >= max_osd " << osdmap.get_max_osd() << dendl;
- m->put();
return false;
}
return false;
ignore:
- m->put();
return true;
}
return true;
ignore:
- m->put();
return true;
}
}
ignore:
- m->put();
return true;
}
}
}
}
-
- m->put();
return true;
}
send_full(m);
else
send_incremental(m, start);
- m->put();
}
MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->get_tid(),
ret, epoch, get_last_committed(), blp);
mon->send_reply(m, reply);
- m->put();
}
struct C_Booted : public Context {
OSDMonitor *cmon;
MonOpRequestRef op;
- // MOSDBoot *m;
bool logit;
C_Booted(OSDMonitor *cm, MonOpRequestRef op_, bool l=true) :
cmon(cm), op(op_), logit(l) {}
cmon->_booted(op, logit);
else if (r == -ECANCELED)
return;
-// m->put();
else if (r == -EAGAIN)
cmon->dispatch(op);
-// cmon->dispatch((PaxosServiceMessage*)m);
else
assert(0 == "bad C_Booted return value");
}
struct C_ReplyMap : public Context {
OSDMonitor *osdmon;
MonOpRequestRef op;
-// PaxosServiceMessage *m;
epoch_t e;
C_ReplyMap(OSDMonitor *o, MonOpRequestRef op_, epoch_t ee)
: osdmon(o), op(op_), e(ee) {}
osdmon->_reply_map(op, e);
else if (r == -ECANCELED)
return;
- //m->put();
else if (r == -EAGAIN)
osdmon->dispatch(op);
else
struct C_PoolOp : public Context {
OSDMonitor *osdmon;
MonOpRequestRef op;
-// MPoolOp *m;
int replyCode;
int epoch;
bufferlist reply_data;
osdmon->_pool_op_reply(op, replyCode, epoch, &reply_data);
else if (r == -ECANCELED)
return;
- //m->put();
else if (r == -EAGAIN)
osdmon->dispatch(op);
else
default:
assert(0);
- m->put();
return true;
}
}
default:
assert(0);
- m->put();
return false;
}
}
// check caps
MonSession *session = statfs->get_session();
if (!session)
- goto out;
+ return;
if (!session->is_capable("pg", MON_CAP_R)) {
dout(0) << "MStatfs received from entity with insufficient privileges "
<< session->caps << dendl;
- goto out;
+ return;
}
MStatfsReply *reply;
if (statfs->fsid != mon->monmap->fsid) {
dout(0) << "handle_statfs on fsid " << statfs->fsid << " != " << mon->monmap->fsid << dendl;
- goto out;
+ return;
}
// fill out stfs
// reply
mon->send_reply(statfs, reply);
- out:
- statfs->put();
}
bool PGMonitor::preprocess_getpoolstats(MonOpRequestRef op)
mon->send_reply(m, reply);
out:
- m->put();
return true;
}
MonSession *session = stats->get_session();
if (!session) {
dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl;
- stats->put();
return true;
}
if (!session->is_capable("pg", MON_CAP_R)) {
derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity "
<< "with insufficient privileges " << session->caps << dendl;
- stats->put();
return true;
}
if (stats->fsid != mon->monmap->fsid) {
dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl;
- stats->put();
return false;
}
!mon->osdmon()->osdmap.is_up(from) ||
stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) {
dout(1) << " ignoring stats from non-active osd." << dendl;
- stats->put();
return false;
}
ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch);
}
mon->send_reply(stats, ack);
- stats->put();
return false;
}
MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
dout(7) << "_updated_stats for " << req->get_orig_source_inst() << dendl;
mon->send_reply(req, ack);
- req->put();
}
PGMonitor *pgmon;
MonOpRequestRef stats_op;
MonOpRequestRef stats_op_ack;
-// MPGStats *req;
-// MPGStatsAck *ack;
entity_inst_t who;
C_Stats(PGMonitor *p,
MonOpRequestRef op,
pgmon->_updated_stats(stats_op, stats_op_ack);
} else if (r == -ECANCELED) {
return;
-// req->put();
-// ack->put();
} else if (r == -EAGAIN) {
pgmon->dispatch(stats_op);
-// ack->put();
} else {
assert(0 == "bad C_Stats return value");
}
<< " leader's lowest version is too high for our last committed"
<< " (theirs: " << collect->first_committed
<< "; ours: " << last_committed << ") -- bootstrap!" << dendl;
- collect->put();
mon->bootstrap();
return;
}
// send reply
collect->get_connection()->send_message(last);
- collect->put();
}
/**
if (!mon->is_leader()) {
dout(10) << "not leader, dropping" << dendl;
- last->put();
return;
}
<< " lowest version is too high for our last committed"
<< " (theirs: " << last->first_committed
<< "; ours: " << last_committed << ") -- bootstrap!" << dendl;
- last->put();
mon->bootstrap();
return;
}
<< " last_committed (" << p->second
<< ") is too low for our first_committed (" << first_committed
<< ") -- bootstrap!" << dendl;
- last->put();
mon->bootstrap();
return;
}
if (need_refresh)
(void)do_refresh();
-
- last->put();
}
void Paxos::collect_timeout()
// can we accept this?
if (begin->pn < accepted_pn) {
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
- begin->put();
return;
}
assert(begin->pn == accepted_pn);
accept->pn = accepted_pn;
accept->last_committed = last_committed;
begin->get_connection()->send_message(accept);
-
- begin->put();
}
// leader
if (accept->pn != accepted_pn) {
// we accepted a higher pn, from some other leader
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
- goto out;
+ return;
}
if (last_committed > 0 &&
accept->last_committed < last_committed-1) {
dout(10) << " this is from an old round, ignoring" << dendl;
- goto out;
+ return;
}
assert(accept->last_committed == last_committed || // not committed
accept->last_committed == last_committed-1); // committed
dout(10) << " got majority, committing, done with update" << dendl;
commit_start();
}
-
- out:
- accept->put();
}
void Paxos::accept_timeout()
if (!mon->is_peon()) {
dout(10) << "not a peon, dropping" << dendl;
assert(0);
- commit->put();
return;
}
if (do_refresh()) {
finish_contexts(g_ceph_context, waiting_for_commit);
}
-
- commit->put();
}
void Paxos::extend_lease()
last_committed != lease->last_committed) {
dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
<< " or the last_committed doesn't match, dropping" << dendl;
- lease->put();
return;
}
finish_contexts(g_ceph_context, waiting_for_active);
if (is_readable())
finish_contexts(g_ceph_context, waiting_for_readable);
-
- lease->put();
}
void Paxos::handle_lease_ack(MonOpRequestRef op)
}
warn_on_future_time(ack->sent_timestamp, ack->get_source());
-
- ack->put();
}
void Paxos::lease_ack_timeout()
// election in progress?
if (!mon->is_leader() && !mon->is_peon()) {
dout(5) << "election in progress, dropping " << *m << dendl;
- m->put();
return;
}
dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl;
if (mon->is_shutdown()) {
- m->put();
return true;
}
m->rx_election_epoch < mon->get_epoch()) {
dout(10) << " discarding forwarded message from previous election epoch "
<< m->rx_election_epoch << " < " << mon->get_epoch() << dendl;
- m->put();
return true;
}
m->get_connection()->get_messenger() != NULL) {
dout(10) << " discarding message from disconnected client "
<< m->get_source_inst() << " " << *m << dendl;
- m->put();
return true;
}
class C_RetryMessage : public Context {
PaxosService *svc;
MonOpRequestRef op;
-// PaxosServiceMessage *m;
public:
C_RetryMessage(PaxosService *s, MonOpRequestRef op_) : svc(s), op(op_) {}
void finish(int r) {
svc->dispatch(op);
else if (r == -ECANCELED)
return;
-// m->put();
else
assert(0 == "bad C_RetryMessage return value");
}