return 0;
}
-bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
+bool AuthMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return preprocess_command((MMonCommand*)m);
+ return preprocess_command(op);
case CEPH_MSG_AUTH:
- return prep_auth((MAuth *)m, false);
+ return prep_auth(op, false);
case MSG_MON_GLOBAL_ID:
return false;
}
}
-bool AuthMonitor::prepare_update(PaxosServiceMessage *m)
+bool AuthMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return prepare_command((MMonCommand*)m);
+ return prepare_command(op);
case MSG_MON_GLOBAL_ID:
- return prepare_global_id((MMonGlobalID*)m);
+ return prepare_global_id(op);
case CEPH_MSG_AUTH:
- return prep_auth((MAuth *)m, true);
+ return prep_auth(op, true);
default:
assert(0);
m->put();
}
}
-uint64_t AuthMonitor::assign_global_id(MAuth *m, bool should_increase_max)
+uint64_t AuthMonitor::assign_global_id(MonOpRequestRef op, bool should_increase_max)
{
+ MAuth *m = static_cast<MAuth*>(op->get_req());
int total_mon = mon->monmap->size();
dout(10) << "AuthMonitor::assign_global_id m=" << *m << " mon=" << mon->rank << "/" << total_mon
<< " last_allocated=" << last_allocated_id << " max_global_id=" << max_global_id << dendl;
}
-bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable)
+bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable)
{
+ MAuth *m = static_cast<MAuth*>(op->get_req());
dout(10) << "prep_auth() blob_size=" << m->get_auth_payload().length() << dendl;
MonSession *s = (MonSession *)m->get_connection()->get_priv();
request. If a client tries to send it later, it'll screw up its auth
session */
if (!s->global_id) {
- s->global_id = assign_global_id(m, paxos_writable);
+ s->global_id = assign_global_id(op, paxos_writable);
if (!s->global_id) {
delete s->auth_handler;
if (mon->is_leader() && paxos_writable) {
dout(10) << "increasing global id, waitlisting message" << dendl;
- wait_for_active(new C_RetryMessage(this, m));
+ wait_for_active(new C_RetryMessage(this, op));
goto done;
}
MMonGlobalID *req = new MMonGlobalID();
req->old_max_id = max_global_id;
mon->messenger->send_message(req, mon->monmap->get_inst(leader));
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
ret = s->auth_handler->handle_request(indata, response_bl, s->global_id, caps_info, &auid);
}
if (ret == -EIO) {
- wait_for_active(new C_RetryMessage(this,m));
+ wait_for_active(new C_RetryMessage(this,op));
goto done;
}
if (caps_info.caps.length()) {
return true;
}
-bool AuthMonitor::preprocess_command(MMonCommand *m)
+bool AuthMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -1;
bufferlist rdata;
stringstream ss, ds;
}
}
-bool AuthMonitor::prepare_command(MMonCommand *m)
+bool AuthMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
stringstream ss, ds;
bufferlist rdata;
string rs;
ss << "imported keyring";
getline(ss, rs);
err = 0;
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "auth add" && !entity_name.empty()) {
if (inc.op == KeyServerData::AUTH_INC_ADD &&
inc.name == entity) {
wait_for_finished_proposal(
- new Monitor::C_Command(mon, m, 0, rs, get_last_committed() + 1));
+ new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1));
return true;
}
}
ss << "added key for " << auth_inc.name;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if ((prefix == "auth get-or-create-key" ||
::decode(auth_inc, q);
if (auth_inc.op == KeyServerData::AUTH_INC_ADD &&
auth_inc.name == entity) {
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
rdata.append(ds);
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, rdata,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, rdata,
get_last_committed() + 1));
return true;
} else if (prefix == "auth caps" && !entity_name.empty()) {
ss << "updated caps for " << auth_inc.name;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "auth del" && !entity_name.empty()) {
ss << "updated";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
return false;
}
-bool AuthMonitor::prepare_global_id(MMonGlobalID *m)
+bool AuthMonitor::prepare_global_id(MonOpRequestRef op)
{
dout(10) << "AuthMonitor::prepare_global_id" << dendl;
increase_max_global_id();
- m->put();
+ //m->put();
return true;
}
void create_initial();
void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
- bool prepare_global_id(MMonGlobalID *m);
+ bool prepare_global_id(MonOpRequestRef op);
void increase_max_global_id();
- uint64_t assign_global_id(MAuth *m, bool should_increase_max);
+ uint64_t assign_global_id(MonOpRequestRef op, bool should_increase_max);
// propose pending update to peers
void encode_pending(MonitorDBStore::TransactionRef t);
virtual void encode_full(MonitorDBStore::TransactionRef t);
version_t get_trim_to();
- bool preprocess_query(PaxosServiceMessage *m); // true if processed.
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op); // true if processed.
+ bool prepare_update(MonOpRequestRef op);
- bool prep_auth(MAuth *m, bool paxos_writable);
+ bool prep_auth(MonOpRequestRef op, bool paxos_writable);
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
bool check_rotate();
public:
}
-bool ConfigKeyService::service_dispatch(Message *m)
+bool ConfigKeyService::service_dispatch(MonOpRequestRef op)
{
+ Message *m = op->get_req();
dout(10) << __func__ << " " << *m << dendl;
if (!in_quorum()) {
dout(1) << __func__ << " not in quorum -- ignore message" << dendl;
}
// we'll reply to the message once the proposal has been handled
store_put(key, data,
- new Monitor::C_Command(mon, cmd, 0, "value stored", 0));
+ new Monitor::C_Command(mon, op, 0, "value stored", 0));
// return for now; we'll put the message once it's done.
return true;
ss << "no such key '" << key << "'";
goto out;
}
- store_delete(key, new Monitor::C_Command(mon, cmd, 0, "key deleted", 0));
+ store_delete(key, new Monitor::C_Command(mon, op, 0, "key deleted", 0));
// return for now; we'll put the message once it's done
return true;
virtual void get_health(Formatter *f,
list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail) { }
- virtual bool service_dispatch(Message *m);
+ virtual bool service_dispatch(MonOpRequestRef op);
virtual void start_epoch() { }
virtual void finish_epoch() { }
}
}
-void DataHealthService::handle_tell(MMonHealth *m)
+void DataHealthService::handle_tell(MonOpRequestRef op)
{
+ MMonHealth *m = static_cast<MMonHealth*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
assert(m->get_service_op() == MMonHealth::OP_TELL);
stats[m->get_source_inst()] = m->data_stats;
}
-bool DataHealthService::service_dispatch(MMonHealth *m)
+bool DataHealthService::service_dispatch_op(MonOpRequestRef op)
{
+ MMonHealth *m = static_cast<MMonHealth*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
assert(m->get_service_type() == get_type());
if (!in_quorum()) {
switch (m->service_op) {
case MMonHealth::OP_TELL:
// someone is telling us their stats
- handle_tell(m);
+ handle_tell(op);
break;
default:
dout(0) << __func__ << " unknown op " << m->service_op << dendl;
map<entity_inst_t,DataStats> stats;
int last_warned_percent;
- void handle_tell(MMonHealth *m);
+ void handle_tell(MonOpRequestRef op);
int update_store_stats(DataStats &ours);
int update_stats();
void share_stats();
protected:
virtual void service_tick();
- virtual bool service_dispatch(Message *m) {
- assert(0 == "We should never reach this; only the function below");
- return false;
- }
- virtual bool service_dispatch(MMonHealth *m);
+ virtual bool service_dispatch_op(MonOpRequestRef op);
virtual void service_shutdown() { }
virtual void start_epoch();
}
-void Elector::handle_propose(MMonElection *m)
+void Elector::handle_propose(MonOpRequestRef op)
{
+ MMonElection *m = static_cast<MMonElection*>(op->get_req());
dout(5) << "handle_propose from " << m->get_source() << dendl;
int from = m->get_source().num();
required_features) {
dout(5) << " ignoring propose from mon" << from
<< " without required features" << dendl;
- nak_old_peer(m);
+ nak_old_peer(op);
return;
} else if (m->epoch > epoch) {
bump_epoch(m->epoch);
m->put();
}
-void Elector::handle_ack(MMonElection *m)
+void Elector::handle_ack(MonOpRequestRef op)
{
+ MMonElection *m = static_cast<MMonElection*>(op->get_req());
dout(5) << "handle_ack from " << m->get_source() << dendl;
int from = m->get_source().num();
}
-void Elector::handle_victory(MMonElection *m)
+void Elector::handle_victory(MonOpRequestRef op)
{
+ MMonElection *m = static_cast<MMonElection*>(op->get_req());
dout(5) << "handle_victory from " << m->get_source() << " quorum_features " << m->quorum_features << dendl;
int from = m->get_source().num();
m->put();
}
-void Elector::nak_old_peer(MMonElection *m)
+void Elector::nak_old_peer(MonOpRequestRef op)
{
+ MMonElection *m = static_cast<MMonElection*>(op->get_req());
uint64_t supported_features = m->get_connection()->get_features();
if (supported_features & CEPH_FEATURE_OSDMAP_ENC) {
m->put();
}
-void Elector::handle_nak(MMonElection *m)
+void Elector::handle_nak(MonOpRequestRef op)
{
+ MMonElection *m = static_cast<MMonElection*>(op->get_req());
dout(1) << "handle_nak from " << m->get_source()
<< " quorum_features " << m->quorum_features << dendl;
// the end!
}
-void Elector::dispatch(Message *m)
+void Elector::dispatch(MonOpRequestRef op)
{
- switch (m->get_type()) {
+ switch (op->get_req()->get_type()) {
case MSG_MON_ELECTION:
{
m->put();
return;
}
- if (m->get_source().num() >= mon->monmap->size()) {
+ if (op->get_req()->get_source().num() >= mon->monmap->size()) {
dout(5) << " ignoring bogus election message with bad mon rank "
- << m->get_source() << dendl;
m->put();
+ << op->get_req()->get_source() << dendl;
return;
}
- MMonElection *em = static_cast<MMonElection*>(m);
+ MMonElection *em = static_cast<MMonElection*>(op->get_req());
// assume an old message encoding would have matched
if (em->fsid != mon->monmap->fsid) {
return;
}
- if (!mon->monmap->contains(m->get_source_addr())) {
- dout(1) << "discarding election message: " << m->get_source_addr()
+ if (!mon->monmap->contains(em->get_source_addr())) {
+ dout(1) << "discarding election message: " << em->get_source_addr()
<< " not in my monmap " << *mon->monmap << dendl;
m->put();
return;
MonMap *peermap = new MonMap;
peermap->decode(em->monmap_bl);
if (peermap->epoch > mon->monmap->epoch) {
- dout(0) << m->get_source_inst() << " has newer monmap epoch " << peermap->epoch
+ dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch
<< " > my epoch " << mon->monmap->epoch
<< ", taking it"
<< dendl;
return;
}
if (peermap->epoch < mon->monmap->epoch) {
- dout(0) << m->get_source_inst() << " has older monmap epoch " << peermap->epoch
+ dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch
<< " < my epoch " << mon->monmap->epoch
<< dendl;
}
switch (em->op) {
case MMonElection::OP_PROPOSE:
- handle_propose(em);
+ handle_propose(op);
return;
}
switch (em->op) {
case MMonElection::OP_ACK:
- handle_ack(em);
+ handle_ack(op);
return;
case MMonElection::OP_VICTORY:
- handle_victory(em);
+ handle_victory(op);
return;
case MMonElection::OP_NAK:
- handle_nak(em);
+ handle_nak(op);
return;
default:
assert(0);
#include "include/Context.h"
#include "common/Timer.h"
+#include "mon/MonOpRequest.h"
class Monitor;
*
* @param m A message sent by another participant in the quorum.
*/
- void handle_propose(class MMonElection *m);
+ void handle_propose(MonOpRequestRef op);
/**
* Handle a message from some other participant Acking us as the Leader.
*
*
* @param m A message with an operation type of OP_ACK
*/
- void handle_ack(class MMonElection *m);
+ void handle_ack(MonOpRequestRef op);
/**
* Handle a message from some other participant declaring Victory.
*
*
* @param m A message with an operation type of OP_VICTORY
*/
- void handle_victory(class MMonElection *m);
+ void handle_victory(MonOpRequestRef op);
/**
* Send a nak to a peer who's out of date, containing information about why.
*
* @param m A message from a monitor not supporting required features. We
* take ownership of the reference.
*/
- void nak_old_peer(class MMonElection *m);
+ void nak_old_peer(MonOpRequestRef op);
/**
* Handle a message from some other participant declaring
* we cannot join the quorum.
*
* @param m A message with an operation type of OP_NAK
*/
- void handle_nak(class MMonElection *m);
+ void handle_nak(MonOpRequestRef op);
public:
/**
*
* @param m A received message
*/
- void dispatch(Message *m);
+ void dispatch(MonOpRequestRef op);
/**
* Call an election.
}
}
-bool HealthMonitor::service_dispatch(Message *m)
+bool HealthMonitor::service_dispatch(MonOpRequestRef op)
{
- assert(m->get_type() == MSG_MON_HEALTH);
- MMonHealth *hm = (MMonHealth*)m;
+ assert(op->get_req()->get_type() == MSG_MON_HEALTH);
+ MMonHealth *hm = static_cast<MMonHealth*>(op->get_req());
int service_type = hm->get_service_type();
if (services.count(service_type) == 0) {
dout(1) << __func__ << " service type " << service_type
<< " not registered -- drop message!" << dendl;
- m->put();
+ //m->put();
return false;
}
- return services[service_type]->service_dispatch(hm);
+ return services[service_type]->service_dispatch(op);
}
void HealthMonitor::service_shutdown()
virtual void get_health(Formatter *f,
list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail);
- virtual bool service_dispatch(Message *m);
+ virtual bool service_dispatch(MonOpRequestRef op);
virtual void start_epoch() {
for (map<int,HealthService*>::iterator it = services.begin();
HealthService(Monitor *m) : QuorumService(m) { }
virtual ~HealthService() { }
- virtual bool service_dispatch(Message *m) {
- return service_dispatch(static_cast<MMonHealth*>(m));
+ virtual bool service_dispatch(MonOpRequestRef op) {
+ return service_dispatch_op(op);
}
- virtual bool service_dispatch(MMonHealth *m) = 0;
+ virtual bool service_dispatch_op(MonOpRequestRef op) = 0;
public:
virtual void get_health(Formatter *f,
return 0;
}
-bool LogMonitor::preprocess_query(PaxosServiceMessage *m)
+bool LogMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return preprocess_command(static_cast<MMonCommand*>(m));
+ return preprocess_command(op);
case MSG_LOG:
- return preprocess_log((MLog*)m);
+ return preprocess_log(op);
default:
assert(0);
}
}
-bool LogMonitor::prepare_update(PaxosServiceMessage *m)
+bool LogMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return prepare_command(static_cast<MMonCommand*>(m));
+ return prepare_command(op);
case MSG_LOG:
- return prepare_log((MLog*)m);
+ return prepare_log(op);
default:
assert(0);
m->put();
}
}
-bool LogMonitor::preprocess_log(MLog *m)
+bool LogMonitor::preprocess_log(MonOpRequestRef op)
{
+ MLog *m = static_cast<MLog*>(op->get_req());
dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
int num_new = 0;
return true;
}
-bool LogMonitor::prepare_log(MLog *m)
+bool LogMonitor::prepare_log(MonOpRequestRef op)
{
+ MLog *m = static_cast<MLog*>(op->get_req());
dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
if (m->fsid != mon->monmap->fsid) {
pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
}
}
- wait_for_finished_proposal(new C_Log(this, m));
+ wait_for_finished_proposal(new C_Log(this, op));
return true;
}
-void LogMonitor::_updated_log(MLog *m)
+void LogMonitor::_updated_log(MonOpRequestRef op)
{
+ MLog *m = static_cast<MLog*>(op->get_req());
dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
mon->send_reply(m, new MLogAck(m->fsid, m->entries.rbegin()->seq));
}
-bool LogMonitor::preprocess_command(MMonCommand *m)
+bool LogMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -1;
bufferlist rdata;
stringstream ss;
}
-bool LogMonitor::prepare_command(MMonCommand *m)
+bool LogMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
stringstream ss;
string rs;
int err = -EINVAL;
le.msg = str_join(logtext, " ");
pending_summary.add(le);
pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, string(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, string(),
get_last_committed() + 1));
return true;
}
void encode_pending(MonitorDBStore::TransactionRef t);
virtual void encode_full(MonitorDBStore::TransactionRef t);
version_t get_trim_to();
- bool preprocess_query(PaxosServiceMessage *m); // true if processed.
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op); // true if processed.
+ bool prepare_update(MonOpRequestRef op);
- bool preprocess_log(MLog *m);
- bool prepare_log(MLog *m);
- void _updated_log(MLog *m);
+ bool preprocess_log(MonOpRequestRef op);
+ bool prepare_log(MonOpRequestRef op);
+ void _updated_log(MonOpRequestRef op);
bool should_propose(double& delay);
struct C_Log : public Context {
LogMonitor *logmon;
- MLog *ack;
- C_Log(LogMonitor *p, MLog *a) : logmon(p), ack(a) {}
+ MonOpRequestRef op;
+ C_Log(LogMonitor *p, MonOpRequestRef o) : logmon(p), op(o) {}
void finish(int r) {
if (r == -ECANCELED) {
- if (ack)
- ack->put();
return;
}
- logmon->_updated_log(ack);
+ logmon->_updated_log(op);
}
};
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
bool _create_sub_summary(MLog *mlog, int level);
void _create_sub_incremental(MLog *mlog, int level, version_t sv);
mon->cluster_logger->set(l_cluster_mds_epoch, mdsmap.get_epoch());
}
-bool MDSMonitor::preprocess_query(PaxosServiceMessage *m)
+bool MDSMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MDS_BEACON:
- return preprocess_beacon(static_cast<MMDSBeacon*>(m));
+ return preprocess_beacon(op);
case MSG_MON_COMMAND:
- return preprocess_command(static_cast<MMonCommand*>(m));
+ return preprocess_command(op);
case MSG_MDS_OFFLOAD_TARGETS:
- return preprocess_offload_targets(static_cast<MMDSLoadTargets*>(m));
+ return preprocess_offload_targets(op);
default:
assert(0);
last_beacon[gid].seq = seq;
}
-bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
+bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
{
+ MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
MDSMap::DaemonState state = m->get_state();
mds_gid_t gid = m->get_global_id();
version_t seq = m->get_seq();
return true;
}
-bool MDSMonitor::preprocess_offload_targets(MMDSLoadTargets* m)
+bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
{
+ MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
dout(10) << "preprocess_offload_targets " << *m << " from " << m->get_orig_source() << dendl;
mds_gid_t gid;
}
-bool MDSMonitor::prepare_update(PaxosServiceMessage *m)
+bool MDSMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(7) << "prepare_update " << *m << dendl;
switch (m->get_type()) {
case MSG_MDS_BEACON:
- return prepare_beacon(static_cast<MMDSBeacon*>(m));
+ return prepare_beacon(op);
case MSG_MON_COMMAND:
- return prepare_command(static_cast<MMonCommand*>(m));
+ return prepare_command(op);
case MSG_MDS_OFFLOAD_TARGETS:
- return prepare_offload_targets(static_cast<MMDSLoadTargets*>(m));
+ return prepare_offload_targets(op);
default:
assert(0);
-bool MDSMonitor::prepare_beacon(MMDSBeacon *m)
+bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
{
+ MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
// -- this is an update --
dout(12) << "prepare_beacon " << *m << " from " << m->get_orig_source_inst() << dendl;
entity_addr_t addr = m->get_orig_source_inst().addr;
bool failed_mds = false;
while (mds_gid_t existing = pending_mdsmap.find_mds_gid_by_name(m->get_name())) {
if (!mon->osdmon()->is_writeable()) {
- mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m));
+ mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op));
return false;
}
fail_mds_gid(existing);
if (!mon->osdmon()->is_writeable()) {
dout(4) << __func__ << ": DAMAGED from rank " << info.rank
<< " waiting for osdmon writeable to blacklist it" << dendl;
- mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m));
+ mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op));
return false;
}
dout(7) << "prepare_beacon pending map now:" << dendl;
print_map(pending_mdsmap);
- wait_for_finished_proposal(new C_Updated(this, m));
+ wait_for_finished_proposal(new C_Updated(this, op));
return true;
}
-bool MDSMonitor::prepare_offload_targets(MMDSLoadTargets *m)
+bool MDSMonitor::prepare_offload_targets(MonOpRequestRef op)
{
+ MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
mds_gid_t gid = m->global_id;
if (pending_mdsmap.mds_info.count(gid)) {
dout(10) << "prepare_offload_targets " << gid << " " << m->targets << dendl;
return PaxosService::should_propose(delay);
}
-void MDSMonitor::_updated(MMDSBeacon *m)
+void MDSMonitor::_updated(MonOpRequestRef op)
{
+ MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
dout(10) << "_updated " << m->get_orig_source() << " " << *m << dendl;
mon->clog->info() << m->get_orig_source_inst() << " "
<< ceph_mds_state_name(m->get_state()) << "\n";
f->dump_unsigned("mdsmap_last_committed", get_last_committed());
}
-bool MDSMonitor::preprocess_command(MMonCommand *m)
+bool MDSMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -1;
bufferlist rdata;
stringstream ss, ds;
return 0;
}
-bool MDSMonitor::prepare_command(MMonCommand *m)
+bool MDSMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -EINVAL;
stringstream ss;
bufferlist rdata;
}
/* Execute filesystem add/remove, or pass through to filesystem_command */
- r = management_command(m, prefix, cmdmap, ss);
+ r = management_command(op, prefix, cmdmap, ss);
if (r >= 0)
goto out;
ss << "No filesystem configured: use `ceph fs new` to create a filesystem";
r = -ENOENT;
} else {
- r = filesystem_command(m, prefix, cmdmap, ss);
+ r = filesystem_command(op, prefix, cmdmap, ss);
if (r < 0 && r == -EAGAIN) {
// Do not reply, the message has been enqueued for retry
return false;
if (r >= 0) {
// success.. delay reply
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, r, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, r, rs,
get_last_committed() + 1));
return true;
} else {
* @retval < 0 An error has occurred; **ss** may have been set.
*/
int MDSMonitor::management_command(
- MMonCommand *m,
+ MonOpRequestRef op,
std::string const &prefix,
map<string, cmd_vartype> &cmdmap,
std::stringstream &ss)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
if (prefix == "mds newfs") {
/* Legacy `newfs` command, takes pool numbers instead of
* names, assumes fs name to be MDS_FS_NAME_DEFAULT, and
// propose. We thus need to make sure the osdmon is writeable before
// we do this, waiting if it's not.
if (!mon->osdmon()->is_writeable()) {
- mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m));
+ mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op));
return -EAGAIN;
}
* @retval < 0 An error has occurred; **ss** may have been set.
*/
int MDSMonitor::filesystem_command(
- MMonCommand *m,
+ MonOpRequestRef op,
std::string const &prefix,
map<string, cmd_vartype> &cmdmap,
std::stringstream &ss)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = 0;
string whostr;
cmd_getval(g_ceph_context, cmdmap, "who", whostr);
cmd_getval(g_ceph_context, cmdmap, "who", who);
r = fail_mds(ss, who);
if (r < 0 && r == -EAGAIN) {
- mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m));
+ mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op));
return -EAGAIN; // don't propose yet; wait for message to be retried
}
class C_Updated : public Context {
MDSMonitor *mm;
- MMDSBeacon *m;
+ MonOpRequestRef op;
public:
- C_Updated(MDSMonitor *a, MMDSBeacon *c) :
- mm(a), m(c) {}
+ C_Updated(MDSMonitor *a, MonOpRequestRef c) :
+ mm(a), op(c) {}
void finish(int r) {
if (r >= 0)
- mm->_updated(m); // success
+ mm->_updated(op); // success
else if (r == -ECANCELED) {
- mm->mon->no_reply(m);
- m->put();
+ mm->mon->no_reply(op->get_req<PaxosServiceMessage>());
+// m->put();
} else {
- mm->dispatch((PaxosServiceMessage*)m); // try again
+ mm->dispatch(op); // try again
}
}
};
void update_logger();
- void _updated(MMDSBeacon *m);
+ void _updated(MonOpRequestRef op);
- bool preprocess_query(PaxosServiceMessage *m); // true if processed.
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op); // true if processed.
+ bool prepare_update(MonOpRequestRef op);
bool should_propose(double& delay);
void on_active();
void _note_beacon(class MMDSBeacon *m);
- bool preprocess_beacon(class MMDSBeacon *m);
- bool prepare_beacon(class MMDSBeacon *m);
+ bool preprocess_beacon(MonOpRequestRef op);
+ bool prepare_beacon(MonOpRequestRef op);
- bool preprocess_offload_targets(MMDSLoadTargets *m);
- bool prepare_offload_targets(MMDSLoadTargets *m);
+ bool preprocess_offload_targets(MonOpRequestRef op);
+ bool prepare_offload_targets(MonOpRequestRef op);
void get_health(list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail) const;
int fail_mds(std::ostream &ss, const std::string &arg);
void fail_mds_gid(mds_gid_t gid);
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
int management_command(
- MMonCommand *m,
+ MonOpRequestRef op,
std::string const &prefix,
map<string, cmd_vartype> &cmdmap,
std::stringstream &ss);
int filesystem_command(
- MMonCommand *m,
+ MonOpRequestRef op,
std::string const &prefix,
map<string, cmd_vartype> &cmdmap,
std::stringstream &ss);
admin_hook(NULL),
health_tick_event(NULL),
health_interval_event(NULL),
- routed_request_tid(0)
+ routed_request_tid(0),
+ op_tracker(cct, true, 1)
{
rank = -1;
paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
- paxos_service[PAXOS_OSDMAP] = new OSDMonitor(this, paxos, "osdmap");
+ paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
bootstrap();
}
-void Monitor::handle_sync(MMonSync *m)
+void Monitor::handle_sync(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
switch (m->op) {
case MMonSync::OP_GET_COOKIE_FULL:
case MMonSync::OP_GET_COOKIE_RECENT:
- handle_sync_get_cookie(m);
+ handle_sync_get_cookie(op);
break;
case MMonSync::OP_GET_CHUNK:
- handle_sync_get_chunk(m);
+ handle_sync_get_chunk(op);
break;
// client -----------
case MMonSync::OP_COOKIE:
- handle_sync_cookie(m);
+ handle_sync_cookie(op);
break;
case MMonSync::OP_CHUNK:
case MMonSync::OP_LAST_CHUNK:
- handle_sync_chunk(m);
+ handle_sync_chunk(op);
break;
case MMonSync::OP_NO_COOKIE:
- handle_sync_no_cookie(m);
+ handle_sync_no_cookie(op);
break;
default:
// leader
-void Monitor::_sync_reply_no_cookie(MMonSync *m)
+void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
m->get_connection()->send_message(reply);
}
-void Monitor::handle_sync_get_cookie(MMonSync *m)
+void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
if (is_synchronizing()) {
- _sync_reply_no_cookie(m);
+ _sync_reply_no_cookie(op);
return;
}
m->get_connection()->send_message(reply);
}
-void Monitor::handle_sync_get_chunk(MMonSync *m)
+void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
if (sync_providers.count(m->cookie) == 0) {
dout(10) << __func__ << " no cookie " << m->cookie << dendl;
- _sync_reply_no_cookie(m);
+ _sync_reply_no_cookie(op);
return;
}
dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
<< " < our fc " << paxos->get_first_committed() << dendl;
sync_providers.erase(m->cookie);
- _sync_reply_no_cookie(m);
+ _sync_reply_no_cookie(op);
return;
}
// requester
-void Monitor::handle_sync_cookie(MMonSync *m)
+void Monitor::handle_sync_cookie(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
if (sync_cookie) {
dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
assert(g_conf->mon_sync_requester_kill_at != 4);
}
-void Monitor::handle_sync_chunk(MMonSync *m)
+void Monitor::handle_sync_chunk(MonOpRequestRef op)
{
+ MMonSync *m = static_cast<MMonSync*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
if (m->cookie != sync_cookie) {
}
}
-void Monitor::handle_sync_no_cookie(MMonSync *m)
+void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
{
dout(10) << __func__ << dendl;
bootstrap();
bootstrap();
}
-void Monitor::handle_probe(MMonProbe *m)
+void Monitor::handle_probe(MonOpRequestRef op)
{
+ MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
dout(10) << "handle_probe " << *m << dendl;
if (m->fsid != monmap->fsid) {
switch (m->op) {
case MMonProbe::OP_PROBE:
- handle_probe_probe(m);
+ handle_probe_probe(op);
break;
case MMonProbe::OP_REPLY:
- handle_probe_reply(m);
+ handle_probe_reply(op);
break;
case MMonProbe::OP_MISSING_FEATURES:
/**
* @todo fix this. This is going to cause trouble.
*/
-void Monitor::handle_probe_probe(MMonProbe *m)
+void Monitor::handle_probe_probe(MonOpRequestRef op)
{
+ MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
MMonProbe *r;
dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
m->put();
}
-void Monitor::handle_probe_reply(MMonProbe *m)
+void Monitor::handle_probe_reply(MonOpRequestRef op)
{
+ MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
dout(10) << " monmap is " << *monmap << dendl;
auth_cluster_required == "cephx";
}
-void Monitor::handle_command(MMonCommand *m)
+void Monitor::handle_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
if (m->fsid != monmap->fsid) {
dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
reply_command(m, -EPERM, "wrong fsid", 0);
<< "cmd=" << m->cmd << ": dispatch";
if (module == "mds" || module == "fs") {
- mdsmon()->dispatch(m);
+ mdsmon()->dispatch(op);
return;
}
if (module == "osd") {
- osdmon()->dispatch(m);
+ osdmon()->dispatch(op);
return;
}
if (module == "pg") {
- pgmon()->dispatch(m);
+ pgmon()->dispatch(op);
return;
}
if (module == "mon" &&
prefix != "mon scrub" &&
prefix != "mon sync force" &&
prefix != "mon metadata") {
- monmon()->dispatch(m);
+ monmon()->dispatch(op);
return;
}
if (module == "auth") {
- authmon()->dispatch(m);
+ authmon()->dispatch(op);
return;
}
if (module == "log") {
- logmon()->dispatch(m);
+ logmon()->dispatch(op);
return;
}
if (module == "config-key") {
- config_key_service->dispatch(m);
+ config_key_service->dispatch(op);
return;
}
// make sure our map is readable and up to date
if (!is_leader() && !is_peon()) {
dout(10) << " waiting for quorum" << dendl;
- waitfor_quorum.push_back(new C_RetryMessage(this, m));
+ waitfor_quorum.push_back(new C_RetryMessage(this, op));
return;
}
_quorum_status(f.get(), ds);
};
//extract the original message and put it into the regular dispatch function
-void Monitor::handle_forward(MForward *m)
+void Monitor::handle_forward(MonOpRequestRef op)
{
+ MForward *m = static_cast<MForward*>(op->get_req());
dout(10) << "received forwarded message from " << m->client
<< " via " << m->get_source_inst() << dendl;
MonSession *session = static_cast<MonSession *>(m->get_connection()->get_priv());
session->put();
}
-void Monitor::handle_route(MRoute *m)
+void Monitor::handle_route(MonOpRequestRef op)
{
+ MRoute *m = static_cast<MRoute*>(op->get_req());
MonSession *session = static_cast<MonSession *>(m->get_connection()->get_priv());
//check privileges
if (session && !session->is_capable("mon", MON_CAP_X)) {
if (mon == rank) {
dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
req->set_connection(rr->con);
- retry.push_back(new C_RetryMessage(this, req));
+ MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(req);
+ retry.push_back(new C_RetryMessage(this, op));
delete rr;
} else {
dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
try_send_message(c, inst);
}
-void Monitor::waitlist_or_zap_client(Message *m)
+void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
{
/**
* Wait list the new session until we're in the quorum, assuming it's
* 3) command messages. We want to accept these under all possible
* circumstances.
*/
- ConnectionRef con = m->get_connection();
+ Message *m = op->get_req();
+ ConnectionRef con = op->get_connection();
utime_t too_old = ceph_clock_now(g_ceph_context);
too_old -= g_ceph_context->_conf->mon_lease;
if (m->get_recv_stamp() > too_old &&
con->is_connected()) {
dout(5) << "waitlisting message " << *m << dendl;
- maybe_wait_for_quorum.push_back(new C_RetryMessage(this, m));
+ maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
} else {
dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
con->mark_down();
return;
}
- ConnectionRef connection = m->get_connection();
+ MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
+ dispatch(op);
+}
+
+void Monitor::dispatch(MonOpRequestRef op)
+{
+ ConnectionRef connection = op->get_connection();
MonSession *s = NULL;
MonCap caps;
- bool src_is_mon;
+ bool src_is_mon = op->is_src_mon();
// regardless of who we are or who the sender is, the message must
// have a connection associated. If it doesn't then something fishy
// is going on.
assert(connection);
- src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
-
bool reuse_caps = false;
dout(20) << "have connection" << dendl;
s = static_cast<MonSession *>(connection->get_priv());
s->put();
s = NULL;
}
+ Message *m = op->get_req();
if (!s) {
// if the sender is not a monitor, make sure their first message for a
// session is an MAuth. If it is not, assume it's a stray message,
m->get_type() != CEPH_MSG_MON_GET_MAP)) {
if (m->get_type() == CEPH_MSG_PING) {
// let it go through and be dispatched immediately!
- return dispatch(s, m, false);
+ return dispatch_op(op);
}
dout(1) << __func__ << " dropping stray message " << *m
<< " from " << m->get_source_inst() << dendl;
- m->put();
return;
}
if (!exited_quorum.is_zero() && !src_is_mon) {
- waitlist_or_zap_client(m);
+ waitlist_or_zap_client(op);
return;
}
s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
m->get_connection()->set_priv(s->get());
dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
+ op->set_session(s);
logger->set(l_mon_num_sessions, session_map.get_size());
logger->inc(l_mon_session_add);
} else {
dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
}
+ op->set_session(s);
assert(s);
if (s->auth_handler) {
dout(20) << " caps " << s->caps.get_str() << dendl;
if (is_synchronizing() && !src_is_mon) {
- waitlist_or_zap_client(m);
+ waitlist_or_zap_client(op);
return;
}
- dispatch(s, m, src_is_mon);
+ dispatch_op(op);
s->put();
return;
}
-void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
+void Monitor::dispatch_op(MonOpRequestRef op)
{
- assert(m != NULL);
-
/* deal with all messages that do not necessarily need caps */
bool dealt_with = true;
- switch (m->get_type()) {
+ switch (op->get_req()->get_type()) {
// auth
case MSG_MON_GLOBAL_ID:
case CEPH_MSG_AUTH:
/* no need to check caps here */
- paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_AUTH]->dispatch(op);
break;
case CEPH_MSG_PING:
- handle_ping(static_cast<MPing*>(m));
+ handle_ping(op);
break;
/* MMonGetMap may be used by clients to obtain a monmap *before*
* not authenticate when obtaining a monmap.
*/
case CEPH_MSG_MON_GET_MAP:
- handle_mon_get_map(static_cast<MMonGetMap*>(m));
+ handle_mon_get_map(op);
break;
case CEPH_MSG_MON_METADATA:
- return handle_mon_metadata(static_cast<MMonMetadata*>(m));
+ return handle_mon_metadata(op);
default:
dealt_with = false;
/* deal with all messages which caps should be checked somewhere else */
dealt_with = true;
- switch (m->get_type()) {
+ switch (op->get_req()->get_type()) {
// OSDs
case CEPH_MSG_MON_GET_OSDMAP:
case MSG_OSD_ALIVE:
case MSG_OSD_PGTEMP:
case MSG_REMOVE_SNAPS:
- paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_OSDMAP]->dispatch(op);
break;
// MDSs
case MSG_MDS_BEACON:
case MSG_MDS_OFFLOAD_TARGETS:
- paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_MDSMAP]->dispatch(op);
break;
case CEPH_MSG_STATFS:
case MSG_PGSTATS:
case MSG_GETPOOLSTATS:
- paxos_service[PAXOS_PGMAP]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_PGMAP]->dispatch(op);
break;
case CEPH_MSG_POOLOP:
- paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_OSDMAP]->dispatch(op);
break;
// log
case MSG_LOG:
- paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_LOG]->dispatch(op);
break;
// handle_command() does its own caps checking
case MSG_MON_COMMAND:
- handle_command(static_cast<MMonCommand*>(m));
+ handle_command(op);
break;
default:
/* messages we, the Monitor class, need to deal with
* but may be sent by clients. */
- if (!s->is_capable("mon", MON_CAP_R)) {
- dout(5) << __func__ << " " << m->get_source_inst()
- << " not enough caps for " << *m << " -- dropping"
+ if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
+ dout(5) << __func__ << " " << op->get_req()->get_source_inst()
+ << " not enough caps for " << *(op->get_req()) << " -- dropping"
<< dendl;
goto drop;
}
dealt_with = true;
- switch (m->get_type()) {
+ switch (op->get_req()->get_type()) {
// misc
case CEPH_MSG_MON_GET_VERSION:
- handle_get_version(static_cast<MMonGetVersion*>(m));
+ handle_get_version(op);
break;
case CEPH_MSG_MON_SUBSCRIBE:
/* FIXME: check what's being subscribed, filter accordingly */
- handle_subscribe(static_cast<MMonSubscribe*>(m));
+ handle_subscribe(op);
break;
default:
if (dealt_with)
return;
- if (!src_is_mon) {
+ if (!op->is_src_mon()) {
dout(1) << __func__ << " unexpected monitor message from"
- << " non-monitor entity " << m->get_source_inst()
- << " " << *m << " -- dropping" << dendl;
+ << " non-monitor entity " << op->get_req()->get_source_inst()
+ << " " << *(op->get_req()) << " -- dropping" << dendl;
goto drop;
}
/* messages that should only be sent by another monitor */
dealt_with = true;
- switch (m->get_type()) {
+ switch (op->get_req()->get_type()) {
case MSG_ROUTE:
- handle_route(static_cast<MRoute*>(m));
+ handle_route(op);
break;
case MSG_MON_PROBE:
- handle_probe(static_cast<MMonProbe*>(m));
+ handle_probe(op);
break;
// Sync (i.e., the new slurp, but on steroids)
case MSG_MON_SYNC:
- handle_sync(static_cast<MMonSync*>(m));
+ handle_sync(op);
break;
case MSG_MON_SCRUB:
- handle_scrub(static_cast<MMonScrub*>(m));
+ handle_scrub(op);
break;
/* log acks are sent from a monitor we sent the MLog to, and are
never sent by clients to us. */
case MSG_LOGACK:
- log_client.handle_log_ack((MLogAck*)m);
- m->put();
+ log_client.handle_log_ack((MLogAck*)op->get_req());
+ //m->put();
break;
// monmap
case MSG_MON_JOIN:
- paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
+ paxos_service[PAXOS_MONMAP]->dispatch(op);
break;
// paxos
case MSG_MON_PAXOS:
{
- MMonPaxos *pm = static_cast<MMonPaxos*>(m);
- if (!src_is_mon ||
- !s->is_capable("mon", MON_CAP_X)) {
+ MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
+ if (!op->is_src_mon() ||
+ !op->get_session()->is_capable("mon", MON_CAP_X)) {
//can't send these!
pm->put();
break;
break;
}
- paxos->dispatch((PaxosServiceMessage*)m);
+ paxos->dispatch(op);
}
break;
// elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
- if (s &&
- !s->is_capable("mon", MON_CAP_X)) {
+ if (op->get_session() &&
+ !op->get_session()->is_capable("mon", MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
- << s->caps << dendl;
- m->put();
+ << op->get_session()->caps << dendl;
+ //m->put();
break;
}
if (!is_probing() && !is_synchronizing()) {
- elector.dispatch(m);
- } else {
- m->put();
+ elector.dispatch(op);
}
break;
case MSG_FORWARD:
- handle_forward(static_cast<MForward *>(m));
+ handle_forward(op);
break;
case MSG_TIMECHECK:
- handle_timecheck(static_cast<MTimeCheck *>(m));
+ handle_timecheck(op);
break;
case MSG_MON_HEALTH:
- health_monitor->dispatch(static_cast<MMonHealth *>(m));
+ health_monitor->dispatch(op);
break;
default:
break;
}
if (!dealt_with) {
- dout(1) << "dropping unexpected " << *m << dendl;
+ dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
goto drop;
}
return;
drop:
- m->put();
+ //m->put();
+ return;
}
-void Monitor::handle_ping(MPing *m)
+void Monitor::handle_ping(MonOpRequestRef op)
{
+ MPing *m = static_cast<MPing*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
MPing *reply = new MPing;
entity_inst_t inst = m->get_source_inst();
return status;
}
-void Monitor::handle_timecheck_leader(MTimeCheck *m)
+void Monitor::handle_timecheck_leader(MonOpRequestRef op)
{
+ MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
/* handles PONG's */
assert(m->op == MTimeCheck::OP_PONG);
}
}
-void Monitor::handle_timecheck_peon(MTimeCheck *m)
+void Monitor::handle_timecheck_peon(MonOpRequestRef op)
{
+ MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
assert(is_peon());
m->get_connection()->send_message(reply);
}
-void Monitor::handle_timecheck(MTimeCheck *m)
+void Monitor::handle_timecheck(MonOpRequestRef op)
{
+ MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
if (is_leader()) {
if (m->op != MTimeCheck::OP_PONG) {
dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
} else {
- handle_timecheck_leader(m);
+ handle_timecheck_leader(op);
}
} else if (is_peon()) {
if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
} else {
- handle_timecheck_peon(m);
+ handle_timecheck_peon(op);
}
} else {
dout(1) << __func__ << " drop unexpected msg" << dendl;
m->put();
}
-void Monitor::handle_subscribe(MMonSubscribe *m)
+void Monitor::handle_subscribe(MonOpRequestRef op)
{
+ MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
dout(10) << "handle_subscribe " << *m << dendl;
bool reply = false;
m->put();
}
-void Monitor::handle_get_version(MMonGetVersion *m)
+void Monitor::handle_get_version(MonOpRequestRef op)
{
+ MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
dout(10) << "handle_get_version " << *m << dendl;
PaxosService *svc = NULL;
if (!is_leader() && !is_peon()) {
dout(10) << " waiting for quorum" << dendl;
- waitfor_quorum.push_back(new C_RetryMessage(this, m));
+ waitfor_quorum.push_back(new C_RetryMessage(this, op));
goto out;
}
if (svc) {
if (!svc->is_readable()) {
- svc->wait_for_readable(new C_RetryMessage(this, m));
+ svc->wait_for_readable(new C_RetryMessage(this, op));
goto out;
}
con->send_message(new MMonMap(bl));
}
-void Monitor::handle_mon_get_map(MMonGetMap *m)
+void Monitor::handle_mon_get_map(MonOpRequestRef op)
{
+ MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
dout(10) << "handle_mon_get_map" << dendl;
send_latest_monmap(m->get_connection().get());
m->put();
}
-void Monitor::handle_mon_metadata(MMonMetadata *m)
+void Monitor::handle_mon_metadata(MonOpRequestRef op)
{
+ MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
if (is_leader()) {
dout(10) << __func__ << dendl;
update_mon_metadata(m->get_source().num(), m->data);
return 0;
}
-void Monitor::handle_scrub(MMonScrub *m)
+void Monitor::handle_scrub(MonOpRequestRef op)
{
+ MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
switch (m->op) {
case MMonScrub::OP_SCRUB:
#include "include/str_map.h"
#include <errno.h>
+#include "common/TrackedOp.h"
+#include "mon/MonOpRequest.h"
+
#define CEPH_MON_PROTOCOL 13 /* cluster internal */
*/
int scrub_start();
int scrub();
- void handle_scrub(MMonScrub *m);
+ void handle_scrub(MonOpRequestRef op);
bool _scrub(ScrubResult *r,
pair<string,string> *start,
int *num_keys);
*
* @param m Sync message with operation type MMonSync::OP_START_CHUNKS
*/
- void handle_sync(MMonSync *m);
+ void handle_sync(MonOpRequestRef op);
- void _sync_reply_no_cookie(MMonSync *m);
+ void _sync_reply_no_cookie(MonOpRequestRef op);
- void handle_sync_get_cookie(MMonSync *m);
- void handle_sync_get_chunk(MMonSync *m);
- void handle_sync_finish(MMonSync *m);
+ void handle_sync_get_cookie(MonOpRequestRef op);
+ void handle_sync_get_chunk(MonOpRequestRef op);
+ void handle_sync_finish(MonOpRequestRef op);
- void handle_sync_cookie(MMonSync *m);
- void handle_sync_forward(MMonSync *m);
- void handle_sync_chunk(MMonSync *m);
- void handle_sync_no_cookie(MMonSync *m);
+ void handle_sync_cookie(MonOpRequestRef op);
+ void handle_sync_forward(MonOpRequestRef op);
+ void handle_sync_chunk(MonOpRequestRef op);
+ void handle_sync_no_cookie(MonOpRequestRef op);
/**
* @} // Synchronization
health_status_t timecheck_status(ostringstream &ss,
const double skew_bound,
const double latency);
- void handle_timecheck_leader(MTimeCheck *m);
- void handle_timecheck_peon(MTimeCheck *m);
- void handle_timecheck(MTimeCheck *m);
+ void handle_timecheck_leader(MonOpRequestRef op);
+ void handle_timecheck_peon(MonOpRequestRef op);
+ void handle_timecheck(MonOpRequestRef op);
/**
* @}
*/
/**
* Handle ping messages from others.
*/
- void handle_ping(MPing *m);
+ void handle_ping(MonOpRequestRef op);
Context *probe_timeout_event; // for probing
void send_latest_monmap(Connection *con);
// messages
- void handle_get_version(MMonGetVersion *m);
- void handle_subscribe(MMonSubscribe *m);
- void handle_mon_get_map(MMonGetMap *m);
+ void handle_get_version(MonOpRequestRef op);
+ void handle_subscribe(MonOpRequestRef op);
+ void handle_mon_get_map(MonOpRequestRef op);
static void _generate_command_map(map<string,cmd_vartype>& cmdmap,
map<string,string> ¶m_str_map);
void get_mon_status(Formatter *f, ostream& ss);
void _quorum_status(Formatter *f, ostream& ss);
bool _add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss);
- void handle_command(class MMonCommand *m);
- void handle_route(MRoute *m);
+ void handle_command(MonOpRequestRef op);
+ void handle_route(MonOpRequestRef op);
- void handle_mon_metadata(MMonMetadata *m);
+ void handle_mon_metadata(MonOpRequestRef op);
int get_mon_metadata(int mon, Formatter *f, ostream& err);
int print_nodes(Formatter *f, ostream& err);
map<int, Metadata> metadata;
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
- void handle_probe(MMonProbe *m);
+ void handle_probe(MonOpRequestRef op);
/**
* Handle a Probe Operation, replying with our name, quorum and known versions.
*
*
* @param m A Probe message, with an operation of type Probe.
*/
- void handle_probe_probe(MMonProbe *m);
- void handle_probe_reply(MMonProbe *m);
+ void handle_probe_probe(MonOpRequestRef op);
+ void handle_probe_reply(MonOpRequestRef op);
// request routing
struct RoutedRequest {
map<uint64_t, RoutedRequest*> routed_requests;
void forward_request_leader(PaxosServiceMessage *req);
- void handle_forward(MForward *m);
+ void handle_forward(MonOpRequestRef op);
void try_send_message(Message *m, const entity_inst_t& to);
void send_reply(PaxosServiceMessage *req, Message *reply);
void no_reply(PaxosServiceMessage *req);
void resend_routed_requests();
void remove_session(MonSession *s);
void remove_all_sessions();
- void waitlist_or_zap_client(Message *m);
+ void waitlist_or_zap_client(MonOpRequestRef op);
void send_command(const entity_inst_t& inst,
const vector<string>& com);
public:
struct C_Command : public Context {
Monitor *mon;
- MMonCommand *m;
+ MonOpRequestRef op;
int rc;
string rs;
bufferlist rdata;
version_t version;
- C_Command(Monitor *_mm, MMonCommand *_m, int r, string s, version_t v) :
- mon(_mm), m(_m), rc(r), rs(s), version(v){}
- C_Command(Monitor *_mm, MMonCommand *_m, int r, string s, bufferlist rd, version_t v) :
- mon(_mm), m(_m), rc(r), rs(s), rdata(rd), version(v){}
+ C_Command(Monitor *_mm, MonOpRequestRef _op, int r, string s, version_t v) :
+ mon(_mm), op(_op), rc(r), rs(s), version(v){}
+ C_Command(Monitor *_mm, MonOpRequestRef _op, int r, string s, bufferlist rd, version_t v) :
+ mon(_mm), op(_op), rc(r), rs(s), rdata(rd), version(v){}
void finish(int r) {
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
if (r >= 0) {
ostringstream ss;
- if (!m->get_connection()) {
+ if (!op->get_req()->get_connection()) {
ss << "connection dropped for command ";
} else {
MonSession *s = m->get_session();
else if (r == -ECANCELED)
m->put();
else if (r == -EAGAIN)
- mon->_ms_dispatch(m);
+ mon->dispatch_op(op);
else
assert(0 == "bad C_Command return value");
}
private:
class C_RetryMessage : public Context {
Monitor *mon;
+ MonOpRequestRef op;
Message *msg;
public:
- C_RetryMessage(Monitor *m, Message *ms) : mon(m), msg(ms) {}
+ C_RetryMessage(Monitor *m, MonOpRequestRef o) : mon(m), op(o) {}
void finish(int r) {
if (r == -EAGAIN || r >= 0)
- mon->_ms_dispatch(msg);
+ mon->dispatch_op(op);
else if (r == -ECANCELED)
- msg->put();
+ return;
else
assert(0 == "bad C_RetryMessage return value");
}
return true;
}
// dissociate message handling from session and connection logic
- void dispatch(MonSession *s, Message *m, const bool src_is_mon);
+ void dispatch(MonOpRequestRef op);
+ void dispatch_op(MonOpRequestRef op);
//mon_caps is used for un-connected messages from monitors
MonCap * mon_caps;
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
void read_features();
void write_features(MonitorDBStore::TransactionRef t);
+ OpTracker op_tracker;
+
public:
Monitor(CephContext *cct_, string nm, MonitorDBStore *s,
Messenger *m, MonMap *map);
mon->clog->info() << "monmap " << *mon->monmap << "\n";
}
-bool MonmapMonitor::preprocess_query(PaxosServiceMessage *m)
+bool MonmapMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
switch (m->get_type()) {
// READs
case MSG_MON_COMMAND:
- return preprocess_command(static_cast<MMonCommand*>(m));
+ return preprocess_command(op);
case MSG_MON_JOIN:
- return preprocess_join(static_cast<MMonJoin*>(m));
+ return preprocess_join(op);
default:
assert(0);
m->put();
f->close_section();
}
-bool MonmapMonitor::preprocess_command(MMonCommand *m)
+bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -1;
bufferlist rdata;
stringstream ss;
}
-bool MonmapMonitor::prepare_update(PaxosServiceMessage *m)
+bool MonmapMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return prepare_command(static_cast<MMonCommand*>(m));
+ return prepare_command(op);
case MSG_MON_JOIN:
- return prepare_join(static_cast<MMonJoin*>(m));
+ return prepare_join(op);
default:
assert(0);
m->put();
return false;
}
-bool MonmapMonitor::prepare_command(MMonCommand *m)
+bool MonmapMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
stringstream ss;
string rs;
int err = -EINVAL;
pending_map.add(name, addr);
pending_map.last_changed = ceph_clock_now(g_ceph_context);
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
return false;
}
-bool MonmapMonitor::preprocess_join(MMonJoin *join)
+bool MonmapMonitor::preprocess_join(MonOpRequestRef op)
{
+ MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
dout(10) << "preprocess_join " << join->name << " at " << join->addr << dendl;
MonSession *session = join->get_session();
}
return false;
}
-bool MonmapMonitor::prepare_join(MMonJoin *join)
+bool MonmapMonitor::prepare_join(MonOpRequestRef op)
{
+ MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
dout(0) << "adding/updating " << join->name << " at " << join->addr << " to monitor cluster" << dendl;
if (pending_map.contains(join->name))
pending_map.remove(join->name);
void dump_info(Formatter *f);
- bool preprocess_query(PaxosServiceMessage *m);
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op);
+ bool prepare_update(MonOpRequestRef op);
- bool preprocess_join(MMonJoin *m);
- bool prepare_join(MMonJoin *m);
+ bool preprocess_join(MonOpRequestRef op);
+ bool prepare_join(MonOpRequestRef op);
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
void get_health(list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail) const;
list<MOSDFailure*> ls;
take_all_failures(ls);
while (!ls.empty()) {
- dispatch(ls.front());
+ MonOpRequestRef op =
+ mon->op_tracker.create_request<MonOpRequest>(ls.front());
+ dispatch(op);
ls.pop_front();
}
}
// -------------
-bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
+bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
// READs
case MSG_MON_COMMAND:
- return preprocess_command(static_cast<MMonCommand*>(m));
+ return preprocess_command(op);
case CEPH_MSG_MON_GET_OSDMAP:
- return preprocess_get_osdmap(static_cast<MMonGetOSDMap*>(m));
+ return preprocess_get_osdmap(op);
// damp updates
case MSG_OSD_MARK_ME_DOWN:
- return preprocess_mark_me_down(static_cast<MOSDMarkMeDown*>(m));
+ return preprocess_mark_me_down(op);
case MSG_OSD_FAILURE:
- return preprocess_failure(static_cast<MOSDFailure*>(m));
+ return preprocess_failure(op);
case MSG_OSD_BOOT:
- return preprocess_boot(static_cast<MOSDBoot*>(m));
+ return preprocess_boot(op);
case MSG_OSD_ALIVE:
- return preprocess_alive(static_cast<MOSDAlive*>(m));
+ return preprocess_alive(op);
case MSG_OSD_PGTEMP:
- return preprocess_pgtemp(static_cast<MOSDPGTemp*>(m));
+ return preprocess_pgtemp(op);
case CEPH_MSG_POOLOP:
- return preprocess_pool_op(static_cast<MPoolOp*>(m));
+ return preprocess_pool_op(op);
case MSG_REMOVE_SNAPS:
- return preprocess_remove_snaps(static_cast<MRemoveSnaps*>(m));
+ return preprocess_remove_snaps(op);
default:
assert(0);
}
}
-bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
+bool OSDMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
// damp updates
case MSG_OSD_MARK_ME_DOWN:
- return prepare_mark_me_down(static_cast<MOSDMarkMeDown*>(m));
+ return prepare_mark_me_down(op);
case MSG_OSD_FAILURE:
- return prepare_failure(static_cast<MOSDFailure*>(m));
+ return prepare_failure(op);
case MSG_OSD_BOOT:
- return prepare_boot(static_cast<MOSDBoot*>(m));
+ return prepare_boot(op);
case MSG_OSD_ALIVE:
- return prepare_alive(static_cast<MOSDAlive*>(m));
+ return prepare_alive(op);
case MSG_OSD_PGTEMP:
- return prepare_pgtemp(static_cast<MOSDPGTemp*>(m));
+ return prepare_pgtemp(op);
case MSG_MON_COMMAND:
- return prepare_command(static_cast<MMonCommand*>(m));
+ return prepare_command(op);
case CEPH_MSG_POOLOP:
- return prepare_pool_op(static_cast<MPoolOp*>(m));
+ return prepare_pool_op(op);
case MSG_REMOVE_SNAPS:
- return prepare_remove_snaps(static_cast<MRemoveSnaps*>(m));
+ return prepare_remove_snaps(op);
default:
assert(0);
// ---------------------------
// READs
-bool OSDMonitor::preprocess_get_osdmap(MMonGetOSDMap *m)
+bool OSDMonitor::preprocess_get_osdmap(MonOpRequestRef op)
{
+ MMonGetOSDMap *m = static_cast<MMonGetOSDMap*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
MOSDMap *reply = new MOSDMap(mon->monmap->fsid);
epoch_t first = get_first_committed();
}
-bool OSDMonitor::preprocess_failure(MOSDFailure *m)
+bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
+ MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
// who is target_osd
int badboy = m->get_target().name.num();
class C_AckMarkedDown : public Context {
OSDMonitor *osdmon;
- MOSDMarkMeDown *m;
+ MonOpRequestRef op;
public:
C_AckMarkedDown(
OSDMonitor *osdmon,
- MOSDMarkMeDown *m)
- : osdmon(osdmon), m(m) {}
+ MonOpRequestRef op)
+ : osdmon(osdmon), op(op) {}
void finish(int) {
+ MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
osdmon->mon->send_reply(
m,
new MOSDMarkMeDown(
}
};
-bool OSDMonitor::preprocess_mark_me_down(MOSDMarkMeDown *m)
+bool OSDMonitor::preprocess_mark_me_down(MonOpRequestRef op)
{
+ MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
int requesting_down = m->get_target().name.num();
int from = m->get_orig_source().num();
reply:
if (m->request_ack) {
- Context *c(new C_AckMarkedDown(this, m));
+ Context *c(new C_AckMarkedDown(this, op));
c->complete(0);
}
return true;
}
-bool OSDMonitor::prepare_mark_me_down(MOSDMarkMeDown *m)
+bool OSDMonitor::prepare_mark_me_down(MonOpRequestRef op)
{
+ MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
int target_osd = m->get_target().name.num();
assert(osdmap.is_up(target_osd));
mon->clog->info() << "osd." << target_osd << " marked itself down\n";
pending_inc.new_state[target_osd] = CEPH_OSD_UP;
if (m->request_ack)
- wait_for_finished_proposal(new C_AckMarkedDown(this, m));
+ wait_for_finished_proposal(new C_AckMarkedDown(this, op));
return true;
}
return false;
}
-bool OSDMonitor::prepare_failure(MOSDFailure *m)
+bool OSDMonitor::prepare_failure(MonOpRequestRef op)
{
+ MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
dout(1) << "prepare_failure " << m->get_target() << " from " << m->get_orig_source_inst()
<< " is reporting failure:" << m->if_osd_failed() << dendl;
// boot --
-bool OSDMonitor::preprocess_boot(MOSDBoot *m)
+bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
{
+ MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
int from = m->get_orig_source_inst().name.num();
// check permissions, ignore if failed (no response expected)
// yup.
dout(7) << "preprocess_boot dup from " << m->get_orig_source_inst()
<< " == " << osdmap.get_inst(from) << dendl;
- _booted(m, false);
+ _booted(op, false);
return true;
}
return true;
}
-bool OSDMonitor::prepare_boot(MOSDBoot *m)
+bool OSDMonitor::prepare_boot(MonOpRequestRef op)
{
+ MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
dout(7) << "prepare_boot from " << m->get_orig_source_inst() << " sb " << m->sb
<< " cluster_addr " << m->cluster_addr
<< " hb_back_addr " << m->hb_back_addr
// mark previous guy down
pending_inc.new_state[from] = CEPH_OSD_UP;
}
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
} else if (pending_inc.new_up_client.count(from)) { //FIXME: should this be using new_up_client?
// already prepared, just wait
dout(7) << "prepare_boot already prepared, waiting on " << m->get_orig_source_addr() << dendl;
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
} else {
// mark new guy up.
pending_inc.new_up_client[from] = m->get_orig_source_addr();
pending_inc.new_xinfo[from] = xi;
// wait
- wait_for_finished_proposal(new C_Booted(this, m));
+ wait_for_finished_proposal(new C_Booted(this, op));
}
return true;
}
-void OSDMonitor::_booted(MOSDBoot *m, bool logit)
+void OSDMonitor::_booted(MonOpRequestRef op, bool logit)
{
- dout(7) << "_booted " << m->get_orig_source_inst()
+ MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
+ dout(7) << "_booted " << m->get_orig_source_inst()
<< " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl;
if (logit) {
// -------------
// alive
-bool OSDMonitor::preprocess_alive(MOSDAlive *m)
+bool OSDMonitor::preprocess_alive(MonOpRequestRef op)
{
+ MOSDAlive *m = static_cast<MOSDAlive*>(op->get_req());
int from = m->get_orig_source().num();
// check permissions, ignore if failed
if (osdmap.get_up_thru(from) >= m->want) {
// yup.
dout(7) << "preprocess_alive want up_thru " << m->want << " dup from " << m->get_orig_source_inst() << dendl;
- _reply_map(m, m->version);
+ _reply_map(op, m->version);
return true;
}
return true;
}
-bool OSDMonitor::prepare_alive(MOSDAlive *m)
+bool OSDMonitor::prepare_alive(MonOpRequestRef op)
{
+ MOSDAlive *m = static_cast<MOSDAlive*>(op->get_req());
int from = m->get_orig_source().num();
if (0) { // we probably don't care much about these
dout(7) << "prepare_alive want up_thru " << m->want << " have " << m->version
<< " from " << m->get_orig_source_inst() << dendl;
pending_inc.new_up_thru[from] = m->version; // set to the latest map the OSD has
- wait_for_finished_proposal(new C_ReplyMap(this, m, m->version));
+ wait_for_finished_proposal(new C_ReplyMap(this, op, m->version));
return true;
}
-void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e)
+void OSDMonitor::_reply_map(MonOpRequestRef op, epoch_t e)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(7) << "_reply_map " << e
<< " from " << m->get_orig_source_inst()
<< dendl;
// -------------
// pg_temp changes
-bool OSDMonitor::preprocess_pgtemp(MOSDPGTemp *m)
+bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
{
+ MOSDPGTemp *m = static_cast<MOSDPGTemp*>(op->get_req());
dout(10) << "preprocess_pgtemp " << *m << dendl;
vector<int> empty;
int from = m->get_orig_source().num();
goto ignore;
dout(7) << "preprocess_pgtemp e" << m->map_epoch << " no changes from " << m->get_orig_source_inst() << dendl;
- _reply_map(m, m->map_epoch);
+ _reply_map(op, m->map_epoch);
return true;
ignore:
return true;
}
-bool OSDMonitor::prepare_pgtemp(MOSDPGTemp *m)
+bool OSDMonitor::prepare_pgtemp(MonOpRequestRef op)
{
+ MOSDPGTemp *m = static_cast<MOSDPGTemp*>(op->get_req());
int from = m->get_orig_source().num();
dout(7) << "prepare_pgtemp e" << m->map_epoch << " from " << m->get_orig_source_inst() << dendl;
for (map<pg_t,vector<int32_t> >::iterator p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
pending_inc.new_primary_temp[p->first] = -1;
}
pending_inc.new_up_thru[from] = m->map_epoch; // set up_thru too, so the osd doesn't have to ask again
- wait_for_finished_proposal(new C_ReplyMap(this, m, m->map_epoch));
+ wait_for_finished_proposal(new C_ReplyMap(this, op, m->map_epoch));
return true;
}
// ---
-bool OSDMonitor::preprocess_remove_snaps(MRemoveSnaps *m)
+bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op)
{
+ MRemoveSnaps *m = static_cast<MRemoveSnaps*>(op->get_req());
dout(7) << "preprocess_remove_snaps " << *m << dendl;
// check privilege, ignore if failed
return true;
}
-bool OSDMonitor::prepare_remove_snaps(MRemoveSnaps *m)
+bool OSDMonitor::prepare_remove_snaps(MonOpRequestRef op)
{
+ MRemoveSnaps *m = static_cast<MRemoveSnaps*>(op->get_req());
dout(7) << "prepare_remove_snaps " << *m << dendl;
for (map<int, vector<snapid_t> >::iterator p = m->snaps.begin();
}
-bool OSDMonitor::preprocess_command(MMonCommand *m)
+bool OSDMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = 0;
bufferlist rdata;
stringstream ss, ds;
}
-int OSDMonitor::prepare_new_pool(MPoolOp *m)
+int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
dout(10) << "prepare_new_pool from " << m->get_connection() << dendl;
MonSession *session = m->get_session();
if (!session)
return 0;
}
-bool OSDMonitor::prepare_set_flag(MMonCommand *m, int flag)
+bool OSDMonitor::prepare_set_flag(MonOpRequestRef op, int flag)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
ostringstream ss;
if (pending_inc.new_flags < 0)
pending_inc.new_flags = osdmap.get_flags();
pending_inc.new_flags |= flag;
ss << "set " << OSDMap::get_flag_string(flag);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
}
-bool OSDMonitor::prepare_unset_flag(MMonCommand *m, int flag)
+bool OSDMonitor::prepare_unset_flag(MonOpRequestRef op, int flag)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
ostringstream ss;
if (pending_inc.new_flags < 0)
pending_inc.new_flags = osdmap.get_flags();
pending_inc.new_flags &= ~flag;
ss << "unset " << OSDMap::get_flag_string(flag);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
}
return 0;
}
-bool OSDMonitor::prepare_command(MMonCommand *m)
+bool OSDMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
stringstream ss;
map<string, cmd_vartype> cmdmap;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
return true;
}
- return prepare_command_impl(m, cmdmap);
+ return prepare_command_impl(op, cmdmap);
}
-bool OSDMonitor::prepare_command_impl(MMonCommand *m,
+bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
map<string,cmd_vartype> &cmdmap)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
bool ret = false;
stringstream ss;
string rs;
ss << action << " item id " << osdid << " name '" << name << "' weight "
<< weight << " at location " << loc << " to crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
ss << "create-or-move updating item name '" << name << "' weight " << weight
<< " at location " << loc << " to crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
pending_inc.crush.clear();
newcrush.encode(pending_inc.crush);
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
err = 0;
}
}
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, err, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, err, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush rm" ||
err = 0;
ss << "device '" << name << "' does not appear in the crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
newcrush.encode(pending_inc.crush);
ss << "removed item id " << id << " name '" << name << "' from crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
newcrush.encode(pending_inc.crush);
ss << "reweighted crush hierarchy";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush reweight") {
ss << "reweighted item id " << id << " name '" << name << "' to " << w
<< " in crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush reweight-subtree") {
ss << "reweighted subtree id " << id << " name '" << name << "' to " << w
<< " in crush map";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush tunables") {
newcrush.encode(pending_inc.crush);
ss << "adjusted tunables profile to " << profile;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush set-tunable") {
newcrush.encode(pending_inc.crush);
ss << "adjusted tunable " << tunable << " to " << value;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
newcrush.encode(pending_inc.crush);
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else {
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
newcrush.encode(pending_inc.crush);
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
pending_inc.new_max_osd = newmax;
ss << "set new max_osd = " << pending_inc.new_max_osd;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd pause") {
- return prepare_set_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
+ return prepare_set_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
} else if (prefix == "osd unpause") {
- return prepare_unset_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
+ return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
} else if (prefix == "osd set") {
string key;
cmd_getval(g_ceph_context, cmdmap, "key", key);
if (key == "full")
- return prepare_set_flag(m, CEPH_OSDMAP_FULL);
+ return prepare_set_flag(op, CEPH_OSDMAP_FULL);
else if (key == "pause")
- return prepare_set_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
+ return prepare_set_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
else if (key == "noup")
- return prepare_set_flag(m, CEPH_OSDMAP_NOUP);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOUP);
else if (key == "nodown")
- return prepare_set_flag(m, CEPH_OSDMAP_NODOWN);
+ return prepare_set_flag(op, CEPH_OSDMAP_NODOWN);
else if (key == "noout")
- return prepare_set_flag(m, CEPH_OSDMAP_NOOUT);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOOUT);
else if (key == "noin")
- return prepare_set_flag(m, CEPH_OSDMAP_NOIN);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOIN);
else if (key == "nobackfill")
- return prepare_set_flag(m, CEPH_OSDMAP_NOBACKFILL);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOBACKFILL);
else if (key == "norebalance")
- return prepare_set_flag(m, CEPH_OSDMAP_NOREBALANCE);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOREBALANCE);
else if (key == "norecover")
- return prepare_set_flag(m, CEPH_OSDMAP_NORECOVER);
+ return prepare_set_flag(op, CEPH_OSDMAP_NORECOVER);
else if (key == "noscrub")
- return prepare_set_flag(m, CEPH_OSDMAP_NOSCRUB);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOSCRUB);
else if (key == "nodeep-scrub")
- return prepare_set_flag(m, CEPH_OSDMAP_NODEEP_SCRUB);
+ return prepare_set_flag(op, CEPH_OSDMAP_NODEEP_SCRUB);
else if (key == "notieragent")
- return prepare_set_flag(m, CEPH_OSDMAP_NOTIERAGENT);
+ return prepare_set_flag(op, CEPH_OSDMAP_NOTIERAGENT);
else {
ss << "unrecognized flag '" << key << "'";
err = -EINVAL;
string key;
cmd_getval(g_ceph_context, cmdmap, "key", key);
if (key == "full")
- return prepare_unset_flag(m, CEPH_OSDMAP_FULL);
+ return prepare_unset_flag(op, CEPH_OSDMAP_FULL);
else if (key == "pause")
- return prepare_unset_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
+ return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
else if (key == "noup")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOUP);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOUP);
else if (key == "nodown")
- return prepare_unset_flag(m, CEPH_OSDMAP_NODOWN);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NODOWN);
else if (key == "noout")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOOUT);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOOUT);
else if (key == "noin")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOIN);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOIN);
else if (key == "nobackfill")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOBACKFILL);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOBACKFILL);
else if (key == "norebalance")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOREBALANCE);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOREBALANCE);
else if (key == "norecover")
- return prepare_unset_flag(m, CEPH_OSDMAP_NORECOVER);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NORECOVER);
else if (key == "noscrub")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOSCRUB);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOSCRUB);
else if (key == "nodeep-scrub")
- return prepare_unset_flag(m, CEPH_OSDMAP_NODEEP_SCRUB);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NODEEP_SCRUB);
else if (key == "notieragent")
- return prepare_unset_flag(m, CEPH_OSDMAP_NOTIERAGENT);
+ return prepare_unset_flag(op, CEPH_OSDMAP_NOTIERAGENT);
else {
ss << "unrecognized flag '" << key << "'";
err = -EINVAL;
}
if (any) {
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, err, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, err, rs,
get_last_committed() + 1));
return true;
}
pending_inc.new_primary_affinity[id] = ww;
ss << "set osd." << id << " primary-affinity to " << w << " (" << ios::hex << ww << ios::dec << ")";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
pending_inc.new_weight[id] = ww;
ss << "reweighted osd." << id << " to " << w << " (" << ios::hex << ww << ios::dec << ")";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
pending_inc.new_lost[id] = e;
ss << "marked osd lost in epoch " << e;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
}
if (pending_inc.new_state.count(id)) {
// osd is about to exist
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
i = id;
}
if (pending_inc.identify_osd(uuid) >= 0) {
// osd is about to exist
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
if (i >= 0) {
ss << i;
rdata.append(ss);
}
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, rdata,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, rdata,
get_last_committed() + 1));
return true;
pending_inc.new_blacklist[addr] = expires;
ss << "blacklisting " << addr << " until " << expires << " (" << d << " sec)";
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (blacklistop == "rm") {
pending_inc.new_blacklist.erase(addr);
ss << "un-blacklisting " << addr;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
ss << "created pool " << poolstr << " snap " << snapname;
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd pool rmsnap") {
ss << "already removed pool " << poolstr << " snap " << snapname;
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd pool create") {
int ruleset;
err = get_crush_ruleset(ruleset_name, &ruleset, &ss);
if (err == -EAGAIN) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
if (err)
ss << "pool '" << poolstr << "' already exists";
break;
case -EAGAIN:
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
case -ERANGE:
goto reply;
ss << "pool '" << poolstr << "' created";
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
err = _prepare_remove_pool(pool, &ss);
if (err == -EAGAIN) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
if (err < 0)
<< cpp_strerror(ret);
}
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, ret, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, ret, rs,
get_last_committed() + 1));
return true;
goto reply;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier add") {
pg_pool_t *np = pending_inc.get_new_pool(pool_id, p);
pg_pool_t *ntp = pending_inc.get_new_pool(tierpool_id, tp);
if (np->tiers.count(tierpool_id) || ntp->is_tier()) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
np->tiers.insert(tierpool_id);
np->set_snap_epoch(pending_inc.epoch); // tier will update to our snap info
ntp->tier_of = pool_id;
ss << "pool '" << tierpoolstr << "' is now (or already was) a tier of '" << poolstr << "'";
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier remove") {
if (np->tiers.count(tierpool_id) == 0 ||
ntp->tier_of != pool_id ||
np->read_tier == tierpool_id) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
np->tiers.erase(tierpool_id);
ntp->clear_tier();
ss << "pool '" << tierpoolstr << "' is now (or already was) not a tier of '" << poolstr << "'";
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier set-overlay") {
ss << "overlay for '" << poolstr << "' is now (or already was) '" << overlaypoolstr << "'";
if (overlay_p->cache_mode == pg_pool_t::CACHEMODE_NONE)
ss <<" (WARNING: overlay pool cache_mode is still NONE)";
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier remove-overlay") {
np->clear_write_tier();
np->last_force_op_resend = pending_inc.epoch;
ss << "there is now (or already was) no overlay for '" << poolstr << "'";
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier cache-mode") {
base_pool->write_tier == pool_id)
ss <<" (WARNING: pool is still configured as read or write tier)";
}
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd tier add-cache") {
pg_pool_t *np = pending_inc.get_new_pool(pool_id, p);
pg_pool_t *ntp = pending_inc.get_new_pool(tierpool_id, tp);
if (np->tiers.count(tierpool_id) || ntp->is_tier()) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
np->tiers.insert(tierpool_id);
ntp->hit_set_params = hsp;
ntp->target_max_bytes = size;
ss << "pool '" << tierpoolstr << "' is now (or already was) a cache tier of '" << poolstr << "'";
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(),
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
return true;
} else if (prefix == "osd pool set-quota") {
}
ss << "set-quota " << field << " = " << value << " for pool " << poolstr;
rs = ss.str();
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else {
ss << "SUCCESSFUL reweight-by-utilization: " << out_str;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
} else {
ss << "SUCCESSFUL reweight-by-pg: " << out_str;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
}
update:
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
wait:
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
-bool OSDMonitor::preprocess_pool_op(MPoolOp *m)
+bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
if (m->op == POOL_OP_CREATE)
- return preprocess_pool_op_create(m);
+ return preprocess_pool_op_create(op);
if (!osdmap.get_pg_pool(m->pool)) {
dout(10) << "attempt to delete non-existent pool id " << m->pool << dendl;
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
switch (m->op) {
case POOL_OP_CREATE_SNAP:
if (p->is_unmanaged_snaps_mode()) {
- _pool_op_reply(m, -EINVAL, osdmap.get_epoch());
+ _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
return true;
}
if (snap_exists) {
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
return false;
case POOL_OP_CREATE_UNMANAGED_SNAP:
if (p->is_pool_snaps_mode()) {
- _pool_op_reply(m, -EINVAL, osdmap.get_epoch());
+ _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
return true;
}
return false;
case POOL_OP_DELETE_SNAP:
if (p->is_unmanaged_snaps_mode()) {
- _pool_op_reply(m, -EINVAL, osdmap.get_epoch());
+ _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
return true;
}
if (!snap_exists) {
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
return false;
case POOL_OP_DELETE_UNMANAGED_SNAP:
if (p->is_pool_snaps_mode()) {
- _pool_op_reply(m, -EINVAL, osdmap.get_epoch());
+ _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
return true;
}
if (p->is_removed_snap(m->snapid)) {
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
return false;
case POOL_OP_DELETE:
if (osdmap.lookup_pg_pool_name(m->name.c_str()) >= 0) {
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
return false;
return false;
}
-bool OSDMonitor::preprocess_pool_op_create(MPoolOp *m)
+bool OSDMonitor::preprocess_pool_op_create(MonOpRequestRef op)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
MonSession *session = m->get_session();
if (!session) {
- _pool_op_reply(m, -EPERM, osdmap.get_epoch());
+ _pool_op_reply(op, -EPERM, osdmap.get_epoch());
return true;
}
if (!session->is_capable("osd", MON_CAP_W)) {
dout(5) << "attempt to create new pool without sufficient auid privileges!"
<< "message: " << *m << std::endl
<< "caps: " << session->caps << dendl;
- _pool_op_reply(m, -EPERM, osdmap.get_epoch());
+ _pool_op_reply(op, -EPERM, osdmap.get_epoch());
return true;
}
int64_t pool = osdmap.lookup_pg_pool_name(m->name.c_str());
if (pool >= 0) {
- _pool_op_reply(m, 0, osdmap.get_epoch());
+ _pool_op_reply(op, 0, osdmap.get_epoch());
return true;
}
return false;
}
-bool OSDMonitor::prepare_pool_op(MPoolOp *m)
+bool OSDMonitor::prepare_pool_op(MonOpRequestRef op)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
dout(10) << "prepare_pool_op " << *m << dendl;
if (m->op == POOL_OP_CREATE) {
- return prepare_pool_op_create(m);
+ return prepare_pool_op_create(op);
} else if (m->op == POOL_OP_DELETE) {
- return prepare_pool_op_delete(m);
+ return prepare_pool_op_delete(op);
}
int ret = 0;
bool changed = false;
if (!osdmap.have_pg_pool(m->pool)) {
- _pool_op_reply(m, -ENOENT, osdmap.get_epoch());
+ _pool_op_reply(op, -ENOENT, osdmap.get_epoch());
return false;
}
} else {
ret = -EINVAL;
}
- _pool_op_reply(m, ret, osdmap.get_epoch());
+ _pool_op_reply(op, ret, osdmap.get_epoch());
return false;
case POOL_OP_DELETE_UNMANAGED_SNAP:
// we won't allow removal of an unmanaged snapshot from a pool
// not in unmanaged snaps mode.
if (!pool->is_unmanaged_snaps_mode()) {
- _pool_op_reply(m, -ENOTSUP, osdmap.get_epoch());
+ _pool_op_reply(op, -ENOTSUP, osdmap.get_epoch());
return false;
}
/* fall-thru */
// but we will allow creating an unmanaged snapshot on any pool
// as long as it is not in 'pool' snaps mode.
if (pool->is_pool_snaps_mode()) {
- _pool_op_reply(m, -EINVAL, osdmap.get_epoch());
+ _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
return false;
}
}
}
out:
- wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, ret, pending_inc.epoch, &reply_data));
+ wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, ret, pending_inc.epoch, &reply_data));
return true;
}
-bool OSDMonitor::prepare_pool_op_create(MPoolOp *m)
+bool OSDMonitor::prepare_pool_op_create(MonOpRequestRef op)
{
- int err = prepare_new_pool(m);
- wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, err, pending_inc.epoch));
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+ int err = prepare_new_pool(op);
+ wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, err, pending_inc.epoch));
return true;
}
return 0;
}
-bool OSDMonitor::prepare_pool_op_delete(MPoolOp *m)
+bool OSDMonitor::prepare_pool_op_delete(MonOpRequestRef op)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
ostringstream ss;
int ret = _prepare_remove_pool(m->pool, &ss);
if (ret == -EAGAIN) {
- wait_for_finished_proposal(new C_RetryMessage(this, m));
+ wait_for_finished_proposal(new C_RetryMessage(this, op));
return true;
}
if (ret < 0)
dout(10) << __func__ << " got " << ret << " " << ss.str() << dendl;
- wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, ret,
+ wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, ret,
pending_inc.epoch));
return true;
}
-void OSDMonitor::_pool_op_reply(MPoolOp *m, int ret, epoch_t epoch, bufferlist *blp)
+void OSDMonitor::_pool_op_reply(MonOpRequestRef op,
+ int ret, epoch_t epoch, bufferlist *blp)
{
+ MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
dout(20) << "_pool_op_reply " << ret << dendl;
MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->get_tid(),
ret, epoch, get_last_committed(), blp);
#include "erasure-code/ErasureCodeInterface.h"
+#include "common/TrackedOp.h"
+#include "mon/MonOpRequest.h"
+
#define OSD_METADATA_PREFIX "osd_metadata"
/// information about a particular peer's failure reports for one osd
void update_logger();
void handle_query(PaxosServiceMessage *m);
- bool preprocess_query(PaxosServiceMessage *m); // true if processed.
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op); // true if processed.
+ bool prepare_update(MonOpRequestRef op);
bool should_propose(double &delay);
version_t get_trim_to();
bool check_source(PaxosServiceMessage *m, uuid_d fsid);
- bool preprocess_get_osdmap(class MMonGetOSDMap *m);
+ bool preprocess_get_osdmap(MonOpRequestRef op);
- bool preprocess_mark_me_down(class MOSDMarkMeDown *m);
+ bool preprocess_mark_me_down(MonOpRequestRef op);
friend class C_AckMarkedDown;
- bool preprocess_failure(class MOSDFailure *m);
- bool prepare_failure(class MOSDFailure *m);
- bool prepare_mark_me_down(class MOSDMarkMeDown *m);
+ bool preprocess_failure(MonOpRequestRef op);
+ bool prepare_failure(MonOpRequestRef op);
+ bool prepare_mark_me_down(MonOpRequestRef op);
void process_failures();
void take_all_failures(list<MOSDFailure*>& ls);
- bool preprocess_boot(class MOSDBoot *m);
- bool prepare_boot(class MOSDBoot *m);
- void _booted(MOSDBoot *m, bool logit);
+ bool preprocess_boot(MonOpRequestRef op);
+ bool prepare_boot(MonOpRequestRef op);
+ void _booted(MonOpRequestRef op, bool logit);
- bool preprocess_alive(class MOSDAlive *m);
- bool prepare_alive(class MOSDAlive *m);
- void _reply_map(PaxosServiceMessage *m, epoch_t e);
+ bool preprocess_alive(MonOpRequestRef op);
+ bool prepare_alive(MonOpRequestRef op);
+ void _reply_map(MonOpRequestRef op, epoch_t e);
- bool preprocess_pgtemp(class MOSDPGTemp *m);
- bool prepare_pgtemp(class MOSDPGTemp *m);
+ bool preprocess_pgtemp(MonOpRequestRef op);
+ bool prepare_pgtemp(MonOpRequestRef op);
int _check_remove_pool(int64_t pool, const pg_pool_t *pi, ostream *ss);
bool _check_become_tier(
int _prepare_remove_pool(int64_t pool, ostream *ss);
int _prepare_rename_pool(int64_t pool, string newname);
- bool preprocess_pool_op ( class MPoolOp *m);
- bool preprocess_pool_op_create ( class MPoolOp *m);
- bool prepare_pool_op (MPoolOp *m);
- bool prepare_pool_op_create (MPoolOp *m);
- bool prepare_pool_op_delete(MPoolOp *m);
+ bool preprocess_pool_op (MonOpRequestRef op);
+ bool preprocess_pool_op_create (MonOpRequestRef op);
+ bool prepare_pool_op (MonOpRequestRef op);
+ bool prepare_pool_op_create (MonOpRequestRef op);
+ bool prepare_pool_op_delete(MonOpRequestRef op);
int crush_rename_bucket(const string& srcname,
const string& dstname,
ostream *ss);
const unsigned pool_type,
const uint64_t expected_num_objects,
ostream *ss);
- int prepare_new_pool(MPoolOp *m);
+ int prepare_new_pool(MonOpRequestRef op);
void update_pool_flags(int64_t pool_id, uint64_t flags);
bool update_pools_status();
void get_pools_health(list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail) const;
- bool prepare_set_flag(MMonCommand *m, int flag);
- bool prepare_unset_flag(MMonCommand *m, int flag);
+ bool prepare_set_flag(MonOpRequestRef op, int flag);
+ bool prepare_unset_flag(MonOpRequestRef op, int flag);
- void _pool_op_reply(MPoolOp *m, int ret, epoch_t epoch, bufferlist *blp=NULL);
+ void _pool_op_reply(MonOpRequestRef op,
+ int ret, epoch_t epoch, bufferlist *blp=NULL);
struct C_Booted : public Context {
OSDMonitor *cmon;
- MOSDBoot *m;
+ MonOpRequestRef op;
+ // MOSDBoot *m;
bool logit;
- C_Booted(OSDMonitor *cm, MOSDBoot *m_, bool l=true) :
- cmon(cm), m(m_), logit(l) {}
+ C_Booted(OSDMonitor *cm, MonOpRequestRef op_, bool l=true) :
+ cmon(cm), op(op_), logit(l) {}
void finish(int r) {
if (r >= 0)
- cmon->_booted(m, logit);
+ cmon->_booted(op, logit);
else if (r == -ECANCELED)
- m->put();
+ return;
+// m->put();
else if (r == -EAGAIN)
- cmon->dispatch((PaxosServiceMessage*)m);
+ cmon->dispatch(op);
+// cmon->dispatch((PaxosServiceMessage*)m);
else
assert(0 == "bad C_Booted return value");
}
struct C_ReplyMap : public Context {
OSDMonitor *osdmon;
- PaxosServiceMessage *m;
+ MonOpRequestRef op;
+// PaxosServiceMessage *m;
epoch_t e;
- C_ReplyMap(OSDMonitor *o, PaxosServiceMessage *mm, epoch_t ee) : osdmon(o), m(mm), e(ee) {}
+ C_ReplyMap(OSDMonitor *o, MonOpRequestRef op_, epoch_t ee)
+ : osdmon(o), op(op_), e(ee) {}
void finish(int r) {
if (r >= 0)
- osdmon->_reply_map(m, e);
+ osdmon->_reply_map(op, e);
else if (r == -ECANCELED)
- m->put();
+ return;
+ //m->put();
else if (r == -EAGAIN)
- osdmon->dispatch(m);
+ osdmon->dispatch(op);
else
assert(0 == "bad C_ReplyMap return value");
}
};
struct C_PoolOp : public Context {
OSDMonitor *osdmon;
- MPoolOp *m;
+ MonOpRequestRef op;
+// MPoolOp *m;
int replyCode;
int epoch;
bufferlist reply_data;
- C_PoolOp(OSDMonitor * osd, MPoolOp *m_, int rc, int e, bufferlist *rd=NULL) :
- osdmon(osd), m(m_), replyCode(rc), epoch(e) {
+ C_PoolOp(OSDMonitor * osd, MonOpRequestRef op_, int rc, int e, bufferlist *rd=NULL) :
+ osdmon(osd), op(op_), replyCode(rc), epoch(e) {
if (rd)
reply_data = *rd;
}
void finish(int r) {
if (r >= 0)
- osdmon->_pool_op_reply(m, replyCode, epoch, &reply_data);
+ osdmon->_pool_op_reply(op, replyCode, epoch, &reply_data);
else if (r == -ECANCELED)
- m->put();
+ return;
+ //m->put();
else if (r == -EAGAIN)
- osdmon->dispatch(m);
+ osdmon->dispatch(op);
else
assert(0 == "bad C_PoolOp return value");
}
};
- bool preprocess_remove_snaps(struct MRemoveSnaps *m);
- bool prepare_remove_snaps(struct MRemoveSnaps *m);
+ bool preprocess_remove_snaps(MonOpRequestRef op);
+ bool prepare_remove_snaps(MonOpRequestRef op);
+
+ CephContext *cct;
+ OpTracker op_tracker;
int load_metadata(int osd, map<string, string>& m, ostream *err);
public:
- OSDMonitor(Monitor *mn, Paxos *p, string service_name)
+ OSDMonitor(CephContext *cct, Monitor *mn, Paxos *p, string service_name)
: PaxosService(mn, p, service_name),
- thrash_map(0), thrash_last_up_osd(-1) { }
+ thrash_map(0), thrash_last_up_osd(-1),
+ op_tracker(cct, true, 1)
+ { }
void tick(); // check state, take actions
void get_health(list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail) const;
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
- bool prepare_command_impl(MMonCommand *m, map<string,cmd_vartype> &cmdmap);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
+ bool prepare_command_impl(MonOpRequestRef op, map<string,cmd_vartype>& cmdmap);
int set_crash_replay_interval(const int64_t pool_id, const uint32_t cri);
int prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
return 0;
}
-bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
+bool PGMonitor::preprocess_query(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case CEPH_MSG_STATFS:
- handle_statfs(static_cast<MStatfs*>(m));
+ handle_statfs(op);
return true;
case MSG_GETPOOLSTATS:
- return preprocess_getpoolstats(static_cast<MGetPoolStats*>(m));
+ return preprocess_getpoolstats(op);
case MSG_PGSTATS:
- return preprocess_pg_stats(static_cast<MPGStats*>(m));
+ return preprocess_pg_stats(op);
case MSG_MON_COMMAND:
- return preprocess_command(static_cast<MMonCommand*>(m));
+ return preprocess_command(op);
default:
}
}
-bool PGMonitor::prepare_update(PaxosServiceMessage *m)
+bool PGMonitor::prepare_update(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_PGSTATS:
- return prepare_pg_stats((MPGStats*)m);
+ return prepare_pg_stats(op);
case MSG_MON_COMMAND:
- return prepare_command(static_cast<MMonCommand*>(m));
+ return prepare_command(op);
default:
assert(0);
}
}
-void PGMonitor::handle_statfs(MStatfs *statfs)
+void PGMonitor::handle_statfs(MonOpRequestRef op)
{
+ MStatfs *statfs = static_cast<MStatfs*>(op->get_req());
// check caps
MonSession *session = statfs->get_session();
if (!session)
statfs->put();
}
-bool PGMonitor::preprocess_getpoolstats(MGetPoolStats *m)
+bool PGMonitor::preprocess_getpoolstats(MonOpRequestRef op)
{
+ MGetPoolStats *m = static_cast<MGetPoolStats*>(op->get_req());
MGetPoolStatsReply *reply;
MonSession *session = m->get_session();
}
-bool PGMonitor::preprocess_pg_stats(MPGStats *stats)
+bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op)
{
+ MPGStats *stats = static_cast<MPGStats*>(op->get_req());
// check caps
MonSession *session = stats->get_session();
if (!session) {
return false;
}
-bool PGMonitor::prepare_pg_stats(MPGStats *stats)
+bool PGMonitor::prepare_pg_stats(MonOpRequestRef op)
{
+ MPGStats *stats = static_cast<MPGStats*>(op->get_req());
dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
int from = stats->get_orig_source().num();
// pg stats
MPGStatsAck *ack = new MPGStatsAck;
+ MonOpRequestRef ack_op = mon->op_tracker.create_request<MonOpRequest>(ack);
ack->set_tid(stats->get_tid());
for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
p != stats->pg_stat.end();
*/
}
- wait_for_finished_proposal(new C_Stats(this, stats, ack));
+ wait_for_finished_proposal(new C_Stats(this, op, ack_op));
return true;
}
-void PGMonitor::_updated_stats(MPGStats *req, MPGStatsAck *ack)
+void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op)
{
+ MPGStats *req = static_cast<MPGStats*>(op->get_req());
+ MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
dout(7) << "_updated_stats for " << req->get_orig_source_inst() << dendl;
mon->send_reply(req, ack);
req->put();
f->dump_unsigned("pgmap_last_committed", get_last_committed());
}
-bool PGMonitor::preprocess_command(MMonCommand *m)
+bool PGMonitor::preprocess_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
int r = -1;
bufferlist rdata;
stringstream ss, ds;
return true;
}
-bool PGMonitor::prepare_command(MMonCommand *m)
+bool PGMonitor::prepare_command(MonOpRequestRef op)
{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
stringstream ss;
pg_t pgid;
epoch_t epoch = mon->osdmon()->osdmap.get_epoch();
update:
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, r, rs,
+ wait_for_finished_proposal(new Monitor::C_Command(mon, op, r, rs,
get_last_committed() + 1));
return true;
}
void read_pgmap_full();
void apply_pgmap_delta(bufferlist& bl);
- bool preprocess_query(PaxosServiceMessage *m); // true if processed.
- bool prepare_update(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op); // true if processed.
+ bool prepare_update(MonOpRequestRef op);
- bool preprocess_pg_stats(MPGStats *stats);
+ bool preprocess_pg_stats(MonOpRequestRef op);
bool pg_stats_have_changed(int from, const MPGStats *stats) const;
- bool prepare_pg_stats(MPGStats *stats);
- void _updated_stats(MPGStats *req, MPGStatsAck *ack);
+ bool prepare_pg_stats(MonOpRequestRef op);
+ void _updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op);
struct C_Stats : public Context {
PGMonitor *pgmon;
- MPGStats *req;
- MPGStatsAck *ack;
+ MonOpRequestRef stats_op;
+ MonOpRequestRef stats_op_ack;
+// MPGStats *req;
+// MPGStatsAck *ack;
entity_inst_t who;
- C_Stats(PGMonitor *p, MPGStats *r, MPGStatsAck *a) : pgmon(p), req(r), ack(a) {}
+ C_Stats(PGMonitor *p,
+ MonOpRequestRef op,
+ MonOpRequestRef op_ack)
+ : pgmon(p), stats_op(op), stats_op_ack(op_ack) {}
void finish(int r) {
if (r >= 0) {
- pgmon->_updated_stats(req, ack);
+ pgmon->_updated_stats(stats_op, stats_op_ack);
} else if (r == -ECANCELED) {
- req->put();
- ack->put();
+ return;
+// req->put();
+// ack->put();
} else if (r == -EAGAIN) {
- pgmon->dispatch(req);
- ack->put();
+ pgmon->dispatch(stats_op);
+// ack->put();
} else {
assert(0 == "bad C_Stats return value");
}
}
};
- void handle_statfs(MStatfs *statfs);
- bool preprocess_getpoolstats(MGetPoolStats *m);
+ void handle_statfs(MonOpRequestRef op);
+ bool preprocess_getpoolstats(MonOpRequestRef op);
- bool preprocess_command(MMonCommand *m);
- bool prepare_command(MMonCommand *m);
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
map<int,utime_t> last_sent_pg_create; // per osd throttle
// peon
-void Paxos::handle_collect(MMonPaxos *collect)
+void Paxos::handle_collect(MonOpRequestRef op)
{
+ MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_collect " << *collect << dendl;
assert(mon->is_peon()); // mon epoch filter should catch strays
// leader
-void Paxos::handle_last(MMonPaxos *last)
+void Paxos::handle_last(MonOpRequestRef op)
{
+ MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
bool need_refresh = false;
int from = last->get_source().num();
}
// peon
-void Paxos::handle_begin(MMonPaxos *begin)
+void Paxos::handle_begin(MonOpRequestRef op)
{
+ MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_begin " << *begin << dendl;
// can we accept this?
}
// leader
-void Paxos::handle_accept(MMonPaxos *accept)
+void Paxos::handle_accept(MonOpRequestRef op)
{
+ MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_accept " << *accept << dendl;
int from = accept->get_source().num();
}
-void Paxos::handle_commit(MMonPaxos *commit)
+void Paxos::handle_commit(MonOpRequestRef op)
{
+ MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_commit on " << commit->last_committed << dendl;
logger->inc(l_paxos_commit);
// peon
-void Paxos::handle_lease(MMonPaxos *lease)
+void Paxos::handle_lease(MonOpRequestRef op)
{
+ MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
// sanity
if (!mon->is_peon() ||
last_committed != lease->last_committed) {
lease->put();
}
-void Paxos::handle_lease_ack(MMonPaxos *ack)
+void Paxos::handle_lease_ack(MonOpRequestRef op)
{
+ MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
int from = ack->get_source().num();
if (!lease_ack_timeout_event) {
}
-void Paxos::dispatch(PaxosServiceMessage *m)
+void Paxos::dispatch(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
// election in progress?
if (!mon->is_leader() && !mon->is_peon()) {
dout(5) << "election in progress, dropping " << *m << dendl;
switch (pm->op) {
// learner
case MMonPaxos::OP_COLLECT:
- handle_collect(pm);
+ handle_collect(op);
break;
case MMonPaxos::OP_LAST:
- handle_last(pm);
+ handle_last(op);
break;
case MMonPaxos::OP_BEGIN:
- handle_begin(pm);
+ handle_begin(op);
break;
case MMonPaxos::OP_ACCEPT:
- handle_accept(pm);
+ handle_accept(op);
break;
case MMonPaxos::OP_COMMIT:
- handle_commit(pm);
+ handle_commit(op);
break;
case MMonPaxos::OP_LEASE:
- handle_lease(pm);
+ handle_lease(op);
break;
case MMonPaxos::OP_LEASE_ACK:
- handle_lease_ack(pm);
+ handle_lease_ack(op);
break;
default:
assert(0);
#include <errno.h>
#include "MonitorDBStore.h"
+#include "mon/MonOpRequest.h"
class Monitor;
class MMonPaxos;
*
* @param collect The collect message sent by the Leader to the Peon.
*/
- void handle_collect(MMonPaxos *collect);
+ void handle_collect(MonOpRequestRef op);
/**
* Handle a response from a Peon to the Leader's collect phase.
*
*
* @param last The message sent by the Peon to the Leader.
*/
- void handle_last(MMonPaxos *last);
+ void handle_last(MonOpRequestRef op);
/**
* The Recovery Phase timed out, meaning that a significant part of the
* quorum does not believe we are the Leader, and we thus should trigger new
* Paxos::begin function
*
*/
- void handle_begin(MMonPaxos *begin);
+ void handle_begin(MonOpRequestRef op);
/**
* Handle an Accept message sent by a Peon.
*
* @param accept The message sent by the Peons to the Leader during the
* Paxos::handle_begin function
*/
- void handle_accept(MMonPaxos *accept);
+ void handle_accept(MonOpRequestRef op);
/**
* Trigger a fresh election.
*
* @param commit The message sent by the Leader to the Peon during
* Paxos::commit
*/
- void handle_commit(MMonPaxos *commit);
+ void handle_commit(MonOpRequestRef op);
/**
* Extend the system's lease.
*
* @param lease The message sent by the Leader to the Peon during the
* Paxos::extend_lease function
*/
- void handle_lease(MMonPaxos *lease);
+ void handle_lease(MonOpRequestRef op);
/**
* Account for all the Lease Acks the Leader receives from the Peons.
*
* @param ack The message sent by a Peon to the Leader during the
* Paxos::handle_lease function
*/
- void handle_lease_ack(MMonPaxos *ack);
+ void handle_lease_ack(MonOpRequestRef op);
/**
* Call fresh elections because at least one Peon didn't acked our lease.
*
return paxos_name;
}
- void dispatch(PaxosServiceMessage *m);
+ void dispatch(MonOpRequestRef op);
void read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
version_t from, version_t last);
#include "include/assert.h"
#include "common/Formatter.h"
+#include "mon/MonOpRequest.h"
+
#define dout_subsys ceph_subsys_paxos
#undef dout_prefix
#define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
<< ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
}
-bool PaxosService::dispatch(PaxosServiceMessage *m)
+bool PaxosService::dispatch(MonOpRequestRef op)
{
+ PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+
dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl;
if (mon->is_shutdown()) {
// make sure our map is readable and up to date
if (!is_readable(m->version)) {
dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;
- wait_for_readable(new C_RetryMessage(this, m), m->version);
+ wait_for_readable(new C_RetryMessage(this, op), m->version);
return true;
}
// preprocess
- if (preprocess_query(m))
+ if (preprocess_query(op))
return true; // easy!
// leader?
// writeable?
if (!is_writeable()) {
dout(10) << " waiting for paxos -> writeable" << dendl;
- wait_for_writeable(new C_RetryMessage(this, m));
+ wait_for_writeable(new C_RetryMessage(this, op));
return true;
}
// update
- if (prepare_update(m)) {
+ if (prepare_update(op)) {
double delay = 0.0;
if (should_propose(delay)) {
if (delay == 0.0) {
*/
class C_RetryMessage : public Context {
PaxosService *svc;
- PaxosServiceMessage *m;
+ MonOpRequestRef op;
+// PaxosServiceMessage *m;
public:
- C_RetryMessage(PaxosService *s, PaxosServiceMessage *m_) : svc(s), m(m_) {}
+ C_RetryMessage(PaxosService *s, MonOpRequestRef op_) : svc(s), op(op_) {}
void finish(int r) {
if (r == -EAGAIN || r >= 0)
- svc->dispatch(m);
+ svc->dispatch(op);
else if (r == -ECANCELED)
- m->put();
+ return;
+// m->put();
else
assert(0 == "bad C_RetryMessage return value");
}
* @param m A message
* @returns 'true' on successful dispatch; 'false' otherwise.
*/
- bool dispatch(PaxosServiceMessage *m);
+ bool dispatch(MonOpRequestRef op);
void refresh(bool *need_bootstrap);
void post_refresh();
* answered, was a state change that has no effect); 'false'
* otherwise.
*/
- virtual bool preprocess_query(PaxosServiceMessage *m) = 0;
+ virtual bool preprocess_query(MonOpRequestRef op) = 0;
/**
* Apply the message to the pending state.
* @returns 'true' if the update message was handled (e.g., a command that
* went through); 'false' otherwise.
*/
- virtual bool prepare_update(PaxosServiceMessage *m) = 0;
+ virtual bool prepare_update(MonOpRequestRef op) = 0;
/**
* @}
*/
return (mon->is_leader() || mon->is_peon());
}
- virtual bool service_dispatch(Message *m) = 0;
+ virtual bool service_dispatch(MonOpRequestRef op) = 0;
virtual void service_tick() = 0;
virtual void service_shutdown() = 0;
return epoch;
}
- bool dispatch(Message *m) {
- return service_dispatch(m);
+ bool dispatch(MonOpRequestRef op) {
+ return service_dispatch(op);
}
void tick() {