From c713d9a632310aeb8818a0f839917aeeaea1c74f Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Thu, 12 Feb 2015 19:41:33 +0000 Subject: [PATCH] mon: optracker (1): support MonOpRequestRef Signed-off-by: Joao Eduardo Luis --- src/mon/AuthMonitor.cc | 54 +++--- src/mon/AuthMonitor.h | 14 +- src/mon/ConfigKeyService.cc | 7 +- src/mon/ConfigKeyService.h | 2 +- src/mon/DataHealthService.cc | 8 +- src/mon/DataHealthService.h | 8 +- src/mon/Elector.cc | 43 +++-- src/mon/Elector.h | 13 +- src/mon/HealthMonitor.cc | 10 +- src/mon/HealthMonitor.h | 2 +- src/mon/HealthService.h | 6 +- src/mon/LogMonitor.cc | 33 ++-- src/mon/LogMonitor.h | 22 +-- src/mon/MDSMonitor.cc | 61 +++--- src/mon/MDSMonitor.h | 36 ++-- src/mon/Monitor.cc | 241 ++++++++++++----------- src/mon/Monitor.h | 80 ++++---- src/mon/MonmapMonitor.cc | 28 +-- src/mon/MonmapMonitor.h | 12 +- src/mon/OSDMonitor.cc | 357 +++++++++++++++++++---------------- src/mon/OSDMonitor.h | 113 ++++++----- src/mon/PGMonitor.cc | 45 +++-- src/mon/PGMonitor.h | 40 ++-- src/mon/Paxos.cc | 38 ++-- src/mon/Paxos.h | 17 +- src/mon/PaxosService.cc | 14 +- src/mon/PaxosService.h | 16 +- src/mon/QuorumService.h | 6 +- 28 files changed, 738 insertions(+), 588 deletions(-) diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index ee83f9dcaadd1..01bd0a6cacd03 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -277,15 +277,16 @@ version_t AuthMonitor::get_trim_to() return 0; } -bool AuthMonitor::preprocess_query(PaxosServiceMessage *m) +bool AuthMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MON_COMMAND: - return preprocess_command((MMonCommand*)m); + return preprocess_command(op); case CEPH_MSG_AUTH: - return prep_auth((MAuth *)m, false); + return prep_auth(op, false); case MSG_MON_GLOBAL_ID: return false; @@ -297,16 +298,17 @@ bool AuthMonitor::preprocess_query(PaxosServiceMessage *m) } } -bool AuthMonitor::prepare_update(PaxosServiceMessage *m) +bool AuthMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MON_COMMAND: - return prepare_command((MMonCommand*)m); + return prepare_command(op); case MSG_MON_GLOBAL_ID: - return prepare_global_id((MMonGlobalID*)m); + return prepare_global_id(op); case CEPH_MSG_AUTH: - return prep_auth((MAuth *)m, true); + return prep_auth(op, true); default: assert(0); m->put(); @@ -314,8 +316,9 @@ bool AuthMonitor::prepare_update(PaxosServiceMessage *m) } } -uint64_t AuthMonitor::assign_global_id(MAuth *m, bool should_increase_max) +uint64_t AuthMonitor::assign_global_id(MonOpRequestRef op, bool should_increase_max) { + MAuth *m = static_cast(op->get_req()); int total_mon = mon->monmap->size(); dout(10) << "AuthMonitor::assign_global_id m=" << *m << " mon=" << mon->rank << "/" << total_mon << " last_allocated=" << last_allocated_id << " max_global_id=" << max_global_id << dendl; @@ -352,8 +355,9 @@ uint64_t AuthMonitor::assign_global_id(MAuth *m, bool should_increase_max) } -bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) +bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable) { + MAuth *m = static_cast(op->get_req()); dout(10) << "prep_auth() blob_size=" << m->get_auth_payload().length() << dendl; MonSession *s = (MonSession *)m->get_connection()->get_priv(); @@ -439,7 +443,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) request. If a client tries to send it later, it'll screw up its auth session */ if (!s->global_id) { - s->global_id = assign_global_id(m, paxos_writable); + s->global_id = assign_global_id(op, paxos_writable); if (!s->global_id) { delete s->auth_handler; @@ -447,7 +451,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) if (mon->is_leader() && paxos_writable) { dout(10) << "increasing global id, waitlisting message" << dendl; - wait_for_active(new C_RetryMessage(this, m)); + wait_for_active(new C_RetryMessage(this, op)); goto done; } @@ -459,7 +463,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) MMonGlobalID *req = new MMonGlobalID(); req->old_max_id = max_global_id; mon->messenger->send_message(req, mon->monmap->get_inst(leader)); - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } @@ -486,7 +490,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) ret = s->auth_handler->handle_request(indata, response_bl, s->global_id, caps_info, &auid); } if (ret == -EIO) { - wait_for_active(new C_RetryMessage(this,m)); + wait_for_active(new C_RetryMessage(this,op)); goto done; } if (caps_info.caps.length()) { @@ -515,8 +519,9 @@ done: return true; } -bool AuthMonitor::preprocess_command(MMonCommand *m) +bool AuthMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -1; bufferlist rdata; stringstream ss, ds; @@ -662,8 +667,9 @@ void AuthMonitor::import_keyring(KeyRing& keyring) } } -bool AuthMonitor::prepare_command(MMonCommand *m) +bool AuthMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); stringstream ss, ds; bufferlist rdata; string rs; @@ -729,7 +735,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) ss << "imported keyring"; getline(ss, rs); err = 0; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "auth add" && !entity_name.empty()) { @@ -767,7 +773,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) if (inc.op == KeyServerData::AUTH_INC_ADD && inc.name == entity) { wait_for_finished_proposal( - new Monitor::C_Command(mon, m, 0, rs, get_last_committed() + 1)); + new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } } @@ -852,7 +858,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) ss << "added key for " << auth_inc.name; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if ((prefix == "auth get-or-create-key" || @@ -910,7 +916,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) ::decode(auth_inc, q); if (auth_inc.op == KeyServerData::AUTH_INC_ADD && auth_inc.name == entity) { - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -946,7 +952,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) rdata.append(ds); getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, rdata, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1)); return true; } else if (prefix == "auth caps" && !entity_name.empty()) { @@ -974,7 +980,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) ss << "updated caps for " << auth_inc.name; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "auth del" && !entity_name.empty()) { @@ -990,7 +996,7 @@ bool AuthMonitor::prepare_command(MMonCommand *m) ss << "updated"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -1002,12 +1008,12 @@ done: return false; } -bool AuthMonitor::prepare_global_id(MMonGlobalID *m) +bool AuthMonitor::prepare_global_id(MonOpRequestRef op) { dout(10) << "AuthMonitor::prepare_global_id" << dendl; increase_max_global_id(); - m->put(); + //m->put(); return true; } diff --git a/src/mon/AuthMonitor.h b/src/mon/AuthMonitor.h index d66de24985f3c..442029027183b 100644 --- a/src/mon/AuthMonitor.h +++ b/src/mon/AuthMonitor.h @@ -143,21 +143,21 @@ private: void create_initial(); void update_from_paxos(bool *need_bootstrap); void create_pending(); // prepare a new pending - bool prepare_global_id(MMonGlobalID *m); + bool prepare_global_id(MonOpRequestRef op); void increase_max_global_id(); - uint64_t assign_global_id(MAuth *m, bool should_increase_max); + uint64_t assign_global_id(MonOpRequestRef op, bool should_increase_max); // propose pending update to peers void encode_pending(MonitorDBStore::TransactionRef t); virtual void encode_full(MonitorDBStore::TransactionRef t); version_t get_trim_to(); - bool preprocess_query(PaxosServiceMessage *m); // true if processed. - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); // true if processed. + bool prepare_update(MonOpRequestRef op); - bool prep_auth(MAuth *m, bool paxos_writable); + bool prep_auth(MonOpRequestRef op, bool paxos_writable); - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); bool check_rotate(); public: diff --git a/src/mon/ConfigKeyService.cc b/src/mon/ConfigKeyService.cc index 97126ed0d1f0e..70e7858e29e2a 100644 --- a/src/mon/ConfigKeyService.cc +++ b/src/mon/ConfigKeyService.cc @@ -88,8 +88,9 @@ void ConfigKeyService::store_list(stringstream &ss) } -bool ConfigKeyService::service_dispatch(Message *m) +bool ConfigKeyService::service_dispatch(MonOpRequestRef op) { + Message *m = op->get_req(); dout(10) << __func__ << " " << *m << dendl; if (!in_quorum()) { dout(1) << __func__ << " not in quorum -- ignore message" << dendl; @@ -154,7 +155,7 @@ bool ConfigKeyService::service_dispatch(Message *m) } // we'll reply to the message once the proposal has been handled store_put(key, data, - new Monitor::C_Command(mon, cmd, 0, "value stored", 0)); + new Monitor::C_Command(mon, op, 0, "value stored", 0)); // return for now; we'll put the message once it's done. return true; @@ -169,7 +170,7 @@ bool ConfigKeyService::service_dispatch(Message *m) ss << "no such key '" << key << "'"; goto out; } - store_delete(key, new Monitor::C_Command(mon, cmd, 0, "key deleted", 0)); + store_delete(key, new Monitor::C_Command(mon, op, 0, "key deleted", 0)); // return for now; we'll put the message once it's done return true; diff --git a/src/mon/ConfigKeyService.h b/src/mon/ConfigKeyService.h index e33070b65bc3e..0ceface9b0ef6 100644 --- a/src/mon/ConfigKeyService.h +++ b/src/mon/ConfigKeyService.h @@ -55,7 +55,7 @@ public: virtual void get_health(Formatter *f, list >& summary, list > *detail) { } - virtual bool service_dispatch(Message *m); + virtual bool service_dispatch(MonOpRequestRef op); virtual void start_epoch() { } virtual void finish_epoch() { } diff --git a/src/mon/DataHealthService.cc b/src/mon/DataHealthService.cc index ef1e0e5babf28..522a409f29c29 100644 --- a/src/mon/DataHealthService.cc +++ b/src/mon/DataHealthService.cc @@ -230,16 +230,18 @@ void DataHealthService::service_tick() } } -void DataHealthService::handle_tell(MMonHealth *m) +void DataHealthService::handle_tell(MonOpRequestRef op) { + MMonHealth *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; assert(m->get_service_op() == MMonHealth::OP_TELL); stats[m->get_source_inst()] = m->data_stats; } -bool DataHealthService::service_dispatch(MMonHealth *m) +bool DataHealthService::service_dispatch_op(MonOpRequestRef op) { + MMonHealth *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; assert(m->get_service_type() == get_type()); if (!in_quorum()) { @@ -251,7 +253,7 @@ bool DataHealthService::service_dispatch(MMonHealth *m) switch (m->service_op) { case MMonHealth::OP_TELL: // someone is telling us their stats - handle_tell(m); + handle_tell(op); break; default: dout(0) << __func__ << " unknown op " << m->service_op << dendl; diff --git a/src/mon/DataHealthService.h b/src/mon/DataHealthService.h index 221e17947a8e4..a986d18fe989a 100644 --- a/src/mon/DataHealthService.h +++ b/src/mon/DataHealthService.h @@ -33,7 +33,7 @@ class DataHealthService : map stats; int last_warned_percent; - void handle_tell(MMonHealth *m); + void handle_tell(MonOpRequestRef op); int update_store_stats(DataStats &ours); int update_stats(); void share_stats(); @@ -45,11 +45,7 @@ class DataHealthService : protected: virtual void service_tick(); - virtual bool service_dispatch(Message *m) { - assert(0 == "We should never reach this; only the function below"); - return false; - } - virtual bool service_dispatch(MMonHealth *m); + virtual bool service_dispatch_op(MonOpRequestRef op); virtual void service_shutdown() { } virtual void start_epoch(); diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc index 6f54cb4bbc700..f4da7dc155937 100644 --- a/src/mon/Elector.cc +++ b/src/mon/Elector.cc @@ -207,8 +207,9 @@ void Elector::victory() } -void Elector::handle_propose(MMonElection *m) +void Elector::handle_propose(MonOpRequestRef op) { + MMonElection *m = static_cast(op->get_req()); dout(5) << "handle_propose from " << m->get_source() << dendl; int from = m->get_source().num(); @@ -221,7 +222,7 @@ void Elector::handle_propose(MMonElection *m) required_features) { dout(5) << " ignoring propose from mon" << from << " without required features" << dendl; - nak_old_peer(m); + nak_old_peer(op); return; } else if (m->epoch > epoch) { bump_epoch(m->epoch); @@ -267,8 +268,9 @@ void Elector::handle_propose(MMonElection *m) m->put(); } -void Elector::handle_ack(MMonElection *m) +void Elector::handle_ack(MonOpRequestRef op) { + MMonElection *m = static_cast(op->get_req()); dout(5) << "handle_ack from " << m->get_source() << dendl; int from = m->get_source().num(); @@ -311,8 +313,9 @@ void Elector::handle_ack(MMonElection *m) } -void Elector::handle_victory(MMonElection *m) +void Elector::handle_victory(MonOpRequestRef op) { + MMonElection *m = static_cast(op->get_req()); dout(5) << "handle_victory from " << m->get_source() << " quorum_features " << m->quorum_features << dendl; int from = m->get_source().num(); @@ -355,8 +358,9 @@ void Elector::handle_victory(MMonElection *m) m->put(); } -void Elector::nak_old_peer(MMonElection *m) +void Elector::nak_old_peer(MonOpRequestRef op) { + MMonElection *m = static_cast(op->get_req()); uint64_t supported_features = m->get_connection()->get_features(); if (supported_features & CEPH_FEATURE_OSDMAP_ENC) { @@ -374,8 +378,9 @@ void Elector::nak_old_peer(MMonElection *m) m->put(); } -void Elector::handle_nak(MMonElection *m) +void Elector::handle_nak(MonOpRequestRef op) { + MMonElection *m = static_cast(op->get_req()); dout(1) << "handle_nak from " << m->get_source() << " quorum_features " << m->quorum_features << dendl; @@ -391,9 +396,9 @@ void Elector::handle_nak(MMonElection *m) // the end! } -void Elector::dispatch(Message *m) +void Elector::dispatch(MonOpRequestRef op) { - switch (m->get_type()) { + switch (op->get_req()->get_type()) { case MSG_MON_ELECTION: { @@ -401,14 +406,14 @@ void Elector::dispatch(Message *m) m->put(); return; } - if (m->get_source().num() >= mon->monmap->size()) { + if (op->get_req()->get_source().num() >= mon->monmap->size()) { dout(5) << " ignoring bogus election message with bad mon rank " - << m->get_source() << dendl; m->put(); + << op->get_req()->get_source() << dendl; return; } - MMonElection *em = static_cast(m); + MMonElection *em = static_cast(op->get_req()); // assume an old message encoding would have matched if (em->fsid != mon->monmap->fsid) { @@ -418,8 +423,8 @@ void Elector::dispatch(Message *m) return; } - if (!mon->monmap->contains(m->get_source_addr())) { - dout(1) << "discarding election message: " << m->get_source_addr() + if (!mon->monmap->contains(em->get_source_addr())) { + dout(1) << "discarding election message: " << em->get_source_addr() << " not in my monmap " << *mon->monmap << dendl; m->put(); return; @@ -428,7 +433,7 @@ void Elector::dispatch(Message *m) MonMap *peermap = new MonMap; peermap->decode(em->monmap_bl); if (peermap->epoch > mon->monmap->epoch) { - dout(0) << m->get_source_inst() << " has newer monmap epoch " << peermap->epoch + dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch << " > my epoch " << mon->monmap->epoch << ", taking it" << dendl; @@ -445,7 +450,7 @@ void Elector::dispatch(Message *m) return; } if (peermap->epoch < mon->monmap->epoch) { - dout(0) << m->get_source_inst() << " has older monmap epoch " << peermap->epoch + dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch << " < my epoch " << mon->monmap->epoch << dendl; } @@ -453,7 +458,7 @@ void Elector::dispatch(Message *m) switch (em->op) { case MMonElection::OP_PROPOSE: - handle_propose(em); + handle_propose(op); return; } @@ -465,13 +470,13 @@ void Elector::dispatch(Message *m) switch (em->op) { case MMonElection::OP_ACK: - handle_ack(em); + handle_ack(op); return; case MMonElection::OP_VICTORY: - handle_victory(em); + handle_victory(op); return; case MMonElection::OP_NAK: - handle_nak(em); + handle_nak(op); return; default: assert(0); diff --git a/src/mon/Elector.h b/src/mon/Elector.h index f2ac66cc3fb8c..ab84d0bf878a6 100644 --- a/src/mon/Elector.h +++ b/src/mon/Elector.h @@ -25,6 +25,7 @@ using namespace std; #include "include/Context.h" #include "common/Timer.h" +#include "mon/MonOpRequest.h" class Monitor; @@ -270,7 +271,7 @@ class Elector { * * @param m A message sent by another participant in the quorum. */ - void handle_propose(class MMonElection *m); + void handle_propose(MonOpRequestRef op); /** * Handle a message from some other participant Acking us as the Leader. * @@ -293,7 +294,7 @@ class Elector { * * @param m A message with an operation type of OP_ACK */ - void handle_ack(class MMonElection *m); + void handle_ack(MonOpRequestRef op); /** * Handle a message from some other participant declaring Victory. * @@ -314,7 +315,7 @@ class Elector { * * @param m A message with an operation type of OP_VICTORY */ - void handle_victory(class MMonElection *m); + void handle_victory(MonOpRequestRef op); /** * Send a nak to a peer who's out of date, containing information about why. * @@ -326,7 +327,7 @@ class Elector { * @param m A message from a monitor not supporting required features. We * take ownership of the reference. */ - void nak_old_peer(class MMonElection *m); + void nak_old_peer(MonOpRequestRef op); /** * Handle a message from some other participant declaring * we cannot join the quorum. @@ -339,7 +340,7 @@ class Elector { * * @param m A message with an operation type of OP_NAK */ - void handle_nak(class MMonElection *m); + void handle_nak(MonOpRequestRef op); public: /** @@ -398,7 +399,7 @@ class Elector { * * @param m A received message */ - void dispatch(Message *m); + void dispatch(MonOpRequestRef op); /** * Call an election. diff --git a/src/mon/HealthMonitor.cc b/src/mon/HealthMonitor.cc index 7cba39bf17cbb..926934c7f644d 100644 --- a/src/mon/HealthMonitor.cc +++ b/src/mon/HealthMonitor.cc @@ -53,18 +53,18 @@ void HealthMonitor::init() } } -bool HealthMonitor::service_dispatch(Message *m) +bool HealthMonitor::service_dispatch(MonOpRequestRef op) { - assert(m->get_type() == MSG_MON_HEALTH); - MMonHealth *hm = (MMonHealth*)m; + assert(op->get_req()->get_type() == MSG_MON_HEALTH); + MMonHealth *hm = static_cast(op->get_req()); int service_type = hm->get_service_type(); if (services.count(service_type) == 0) { dout(1) << __func__ << " service type " << service_type << " not registered -- drop message!" << dendl; - m->put(); + //m->put(); return false; } - return services[service_type]->service_dispatch(hm); + return services[service_type]->service_dispatch(op); } void HealthMonitor::service_shutdown() diff --git a/src/mon/HealthMonitor.h b/src/mon/HealthMonitor.h index 3d842615c936a..43898762f064f 100644 --- a/src/mon/HealthMonitor.h +++ b/src/mon/HealthMonitor.h @@ -45,7 +45,7 @@ public: virtual void get_health(Formatter *f, list >& summary, list > *detail); - virtual bool service_dispatch(Message *m); + virtual bool service_dispatch(MonOpRequestRef op); virtual void start_epoch() { for (map::iterator it = services.begin(); diff --git a/src/mon/HealthService.h b/src/mon/HealthService.h index 2a46f882b8db8..7b3d7acfacad0 100644 --- a/src/mon/HealthService.h +++ b/src/mon/HealthService.h @@ -30,11 +30,11 @@ struct HealthService : public QuorumService HealthService(Monitor *m) : QuorumService(m) { } virtual ~HealthService() { } - virtual bool service_dispatch(Message *m) { - return service_dispatch(static_cast(m)); + virtual bool service_dispatch(MonOpRequestRef op) { + return service_dispatch_op(op); } - virtual bool service_dispatch(MMonHealth *m) = 0; + virtual bool service_dispatch_op(MonOpRequestRef op) = 0; public: virtual void get_health(Formatter *f, diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc index b364874d4975f..4e8d66f79b791 100644 --- a/src/mon/LogMonitor.cc +++ b/src/mon/LogMonitor.cc @@ -264,15 +264,16 @@ version_t LogMonitor::get_trim_to() return 0; } -bool LogMonitor::preprocess_query(PaxosServiceMessage *m) +bool LogMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MON_COMMAND: - return preprocess_command(static_cast(m)); + return preprocess_command(op); case MSG_LOG: - return preprocess_log((MLog*)m); + return preprocess_log(op); default: assert(0); @@ -281,14 +282,15 @@ bool LogMonitor::preprocess_query(PaxosServiceMessage *m) } } -bool LogMonitor::prepare_update(PaxosServiceMessage *m) +bool LogMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MON_COMMAND: - return prepare_command(static_cast(m)); + return prepare_command(op); case MSG_LOG: - return prepare_log((MLog*)m); + return prepare_log(op); default: assert(0); m->put(); @@ -296,8 +298,9 @@ bool LogMonitor::prepare_update(PaxosServiceMessage *m) } } -bool LogMonitor::preprocess_log(MLog *m) +bool LogMonitor::preprocess_log(MonOpRequestRef op) { + MLog *m = static_cast(op->get_req()); dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl; int num_new = 0; @@ -328,8 +331,9 @@ bool LogMonitor::preprocess_log(MLog *m) return true; } -bool LogMonitor::prepare_log(MLog *m) +bool LogMonitor::prepare_log(MonOpRequestRef op) { + MLog *m = static_cast(op->get_req()); dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl; if (m->fsid != mon->monmap->fsid) { @@ -348,12 +352,13 @@ bool LogMonitor::prepare_log(MLog *m) pending_log.insert(pair(p->stamp, *p)); } } - wait_for_finished_proposal(new C_Log(this, m)); + wait_for_finished_proposal(new C_Log(this, op)); return true; } -void LogMonitor::_updated_log(MLog *m) +void LogMonitor::_updated_log(MonOpRequestRef op) { + MLog *m = static_cast(op->get_req()); dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl; mon->send_reply(m, new MLogAck(m->fsid, m->entries.rbegin()->seq)); @@ -372,8 +377,9 @@ bool LogMonitor::should_propose(double& delay) } -bool LogMonitor::preprocess_command(MMonCommand *m) +bool LogMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -1; bufferlist rdata; stringstream ss; @@ -388,8 +394,9 @@ bool LogMonitor::preprocess_command(MMonCommand *m) } -bool LogMonitor::prepare_command(MMonCommand *m) +bool LogMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); stringstream ss; string rs; int err = -EINVAL; @@ -422,7 +429,7 @@ bool LogMonitor::prepare_command(MMonCommand *m) le.msg = str_join(logtext, " "); pending_summary.add(le); pending_log.insert(pair(le.stamp, le)); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, string(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, string(), get_last_committed() + 1)); return true; } diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h index 4cbeb6ec41ffe..a298d52a10396 100644 --- a/src/mon/LogMonitor.h +++ b/src/mon/LogMonitor.h @@ -116,12 +116,12 @@ private: void encode_pending(MonitorDBStore::TransactionRef t); virtual void encode_full(MonitorDBStore::TransactionRef t); version_t get_trim_to(); - bool preprocess_query(PaxosServiceMessage *m); // true if processed. - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); // true if processed. + bool prepare_update(MonOpRequestRef op); - bool preprocess_log(MLog *m); - bool prepare_log(MLog *m); - void _updated_log(MLog *m); + bool preprocess_log(MonOpRequestRef op); + bool prepare_log(MonOpRequestRef op); + void _updated_log(MonOpRequestRef op); bool should_propose(double& delay); @@ -132,20 +132,18 @@ private: struct C_Log : public Context { LogMonitor *logmon; - MLog *ack; - C_Log(LogMonitor *p, MLog *a) : logmon(p), ack(a) {} + MonOpRequestRef op; + C_Log(LogMonitor *p, MonOpRequestRef o) : logmon(p), op(o) {} void finish(int r) { if (r == -ECANCELED) { - if (ack) - ack->put(); return; } - logmon->_updated_log(ack); + logmon->_updated_log(op); } }; - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); bool _create_sub_summary(MLog *mlog, int level); void _create_sub_incremental(MLog *mlog, int level, version_t sv); diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 0c123c69d3d8e..60c28ac7239d4 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -212,20 +212,21 @@ void MDSMonitor::update_logger() mon->cluster_logger->set(l_cluster_mds_epoch, mdsmap.get_epoch()); } -bool MDSMonitor::preprocess_query(PaxosServiceMessage *m) +bool MDSMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MDS_BEACON: - return preprocess_beacon(static_cast(m)); + return preprocess_beacon(op); case MSG_MON_COMMAND: - return preprocess_command(static_cast(m)); + return preprocess_command(op); case MSG_MDS_OFFLOAD_TARGETS: - return preprocess_offload_targets(static_cast(m)); + return preprocess_offload_targets(op); default: assert(0); @@ -244,8 +245,9 @@ void MDSMonitor::_note_beacon(MMDSBeacon *m) last_beacon[gid].seq = seq; } -bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) +bool MDSMonitor::preprocess_beacon(MonOpRequestRef op) { + MMDSBeacon *m = static_cast(op->get_req()); MDSMap::DaemonState state = m->get_state(); mds_gid_t gid = m->get_global_id(); version_t seq = m->get_seq(); @@ -374,8 +376,9 @@ bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) return true; } -bool MDSMonitor::preprocess_offload_targets(MMDSLoadTargets* m) +bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op) { + MMDSLoadTargets *m = static_cast(op->get_req()); dout(10) << "preprocess_offload_targets " << *m << " from " << m->get_orig_source() << dendl; mds_gid_t gid; @@ -402,20 +405,21 @@ bool MDSMonitor::preprocess_offload_targets(MMDSLoadTargets* m) } -bool MDSMonitor::prepare_update(PaxosServiceMessage *m) +bool MDSMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(7) << "prepare_update " << *m << dendl; switch (m->get_type()) { case MSG_MDS_BEACON: - return prepare_beacon(static_cast(m)); + return prepare_beacon(op); case MSG_MON_COMMAND: - return prepare_command(static_cast(m)); + return prepare_command(op); case MSG_MDS_OFFLOAD_TARGETS: - return prepare_offload_targets(static_cast(m)); + return prepare_offload_targets(op); default: assert(0); @@ -427,8 +431,9 @@ bool MDSMonitor::prepare_update(PaxosServiceMessage *m) -bool MDSMonitor::prepare_beacon(MMDSBeacon *m) +bool MDSMonitor::prepare_beacon(MonOpRequestRef op) { + MMDSBeacon *m = static_cast(op->get_req()); // -- this is an update -- dout(12) << "prepare_beacon " << *m << " from " << m->get_orig_source_inst() << dendl; entity_addr_t addr = m->get_orig_source_inst().addr; @@ -453,7 +458,7 @@ bool MDSMonitor::prepare_beacon(MMDSBeacon *m) bool failed_mds = false; while (mds_gid_t existing = pending_mdsmap.find_mds_gid_by_name(m->get_name())) { if (!mon->osdmon()->is_writeable()) { - mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m)); + mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op)); return false; } fail_mds_gid(existing); @@ -562,7 +567,7 @@ bool MDSMonitor::prepare_beacon(MMDSBeacon *m) if (!mon->osdmon()->is_writeable()) { dout(4) << __func__ << ": DAMAGED from rank " << info.rank << " waiting for osdmon writeable to blacklist it" << dendl; - mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m)); + mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op)); return false; } @@ -614,13 +619,14 @@ bool MDSMonitor::prepare_beacon(MMDSBeacon *m) dout(7) << "prepare_beacon pending map now:" << dendl; print_map(pending_mdsmap); - wait_for_finished_proposal(new C_Updated(this, m)); + wait_for_finished_proposal(new C_Updated(this, op)); return true; } -bool MDSMonitor::prepare_offload_targets(MMDSLoadTargets *m) +bool MDSMonitor::prepare_offload_targets(MonOpRequestRef op) { + MMDSLoadTargets *m = static_cast(op->get_req()); mds_gid_t gid = m->global_id; if (pending_mdsmap.mds_info.count(gid)) { dout(10) << "prepare_offload_targets " << gid << " " << m->targets << dendl; @@ -638,8 +644,9 @@ bool MDSMonitor::should_propose(double& delay) return PaxosService::should_propose(delay); } -void MDSMonitor::_updated(MMDSBeacon *m) +void MDSMonitor::_updated(MonOpRequestRef op) { + MMDSBeacon *m = static_cast(op->get_req()); dout(10) << "_updated " << m->get_orig_source() << " " << *m << dendl; mon->clog->info() << m->get_orig_source_inst() << " " << ceph_mds_state_name(m->get_state()) << "\n"; @@ -725,8 +732,9 @@ void MDSMonitor::dump_info(Formatter *f) f->dump_unsigned("mdsmap_last_committed", get_last_committed()); } -bool MDSMonitor::preprocess_command(MMonCommand *m) +bool MDSMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -1; bufferlist rdata; stringstream ss, ds; @@ -1043,8 +1051,9 @@ int MDSMonitor::fail_mds(std::ostream &ss, const std::string &arg) return 0; } -bool MDSMonitor::prepare_command(MMonCommand *m) +bool MDSMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -EINVAL; stringstream ss; bufferlist rdata; @@ -1067,7 +1076,7 @@ bool MDSMonitor::prepare_command(MMonCommand *m) } /* Execute filesystem add/remove, or pass through to filesystem_command */ - r = management_command(m, prefix, cmdmap, ss); + r = management_command(op, prefix, cmdmap, ss); if (r >= 0) goto out; @@ -1087,7 +1096,7 @@ bool MDSMonitor::prepare_command(MMonCommand *m) ss << "No filesystem configured: use `ceph fs new` to create a filesystem"; r = -ENOENT; } else { - r = filesystem_command(m, prefix, cmdmap, ss); + r = filesystem_command(op, prefix, cmdmap, ss); if (r < 0 && r == -EAGAIN) { // Do not reply, the message has been enqueued for retry return false; @@ -1101,7 +1110,7 @@ out: if (r >= 0) { // success.. delay reply - wait_for_finished_proposal(new Monitor::C_Command(mon, m, r, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, r, rs, get_last_committed() + 1)); return true; } else { @@ -1175,11 +1184,12 @@ int MDSMonitor::_check_pool( * @retval < 0 An error has occurred; **ss** may have been set. */ int MDSMonitor::management_command( - MMonCommand *m, + MonOpRequestRef op, std::string const &prefix, map &cmdmap, std::stringstream &ss) { + MMonCommand *m = static_cast(op->get_req()); if (prefix == "mds newfs") { /* Legacy `newfs` command, takes pool numbers instead of * names, assumes fs name to be MDS_FS_NAME_DEFAULT, and @@ -1298,7 +1308,7 @@ int MDSMonitor::management_command( // propose. We thus need to make sure the osdmon is writeable before // we do this, waiting if it's not. if (!mon->osdmon()->is_writeable()) { - mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m)); + mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op)); return -EAGAIN; } @@ -1416,11 +1426,12 @@ int MDSMonitor::management_command( * @retval < 0 An error has occurred; **ss** may have been set. */ int MDSMonitor::filesystem_command( - MMonCommand *m, + MonOpRequestRef op, std::string const &prefix, map &cmdmap, std::stringstream &ss) { + MMonCommand *m = static_cast(op->get_req()); int r = 0; string whostr; cmd_getval(g_ceph_context, cmdmap, "who", whostr); @@ -1575,7 +1586,7 @@ int MDSMonitor::filesystem_command( cmd_getval(g_ceph_context, cmdmap, "who", who); r = fail_mds(ss, who); if (r < 0 && r == -EAGAIN) { - mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, m)); + mon->osdmon()->wait_for_writeable(new C_RetryMessage(this, op)); return -EAGAIN; // don't propose yet; wait for message to be retried } diff --git a/src/mon/MDSMonitor.h b/src/mon/MDSMonitor.h index e3d5bd3d5b165..714ed34a87320 100644 --- a/src/mon/MDSMonitor.h +++ b/src/mon/MDSMonitor.h @@ -51,18 +51,18 @@ class MDSMonitor : public PaxosService { class C_Updated : public Context { MDSMonitor *mm; - MMDSBeacon *m; + MonOpRequestRef op; public: - C_Updated(MDSMonitor *a, MMDSBeacon *c) : - mm(a), m(c) {} + C_Updated(MDSMonitor *a, MonOpRequestRef c) : + mm(a), op(c) {} void finish(int r) { if (r >= 0) - mm->_updated(m); // success + mm->_updated(op); // success else if (r == -ECANCELED) { - mm->mon->no_reply(m); - m->put(); + mm->mon->no_reply(op->get_req()); +// m->put(); } else { - mm->dispatch((PaxosServiceMessage*)m); // try again + mm->dispatch(op); // try again } } }; @@ -82,35 +82,35 @@ class MDSMonitor : public PaxosService { void update_logger(); - void _updated(MMDSBeacon *m); + void _updated(MonOpRequestRef op); - bool preprocess_query(PaxosServiceMessage *m); // true if processed. - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); // true if processed. + bool prepare_update(MonOpRequestRef op); bool should_propose(double& delay); void on_active(); void _note_beacon(class MMDSBeacon *m); - bool preprocess_beacon(class MMDSBeacon *m); - bool prepare_beacon(class MMDSBeacon *m); + bool preprocess_beacon(MonOpRequestRef op); + bool prepare_beacon(MonOpRequestRef op); - bool preprocess_offload_targets(MMDSLoadTargets *m); - bool prepare_offload_targets(MMDSLoadTargets *m); + bool preprocess_offload_targets(MonOpRequestRef op); + bool prepare_offload_targets(MonOpRequestRef op); void get_health(list >& summary, list > *detail) const; int fail_mds(std::ostream &ss, const std::string &arg); void fail_mds_gid(mds_gid_t gid); - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); int management_command( - MMonCommand *m, + MonOpRequestRef op, std::string const &prefix, map &cmdmap, std::stringstream &ss); int filesystem_command( - MMonCommand *m, + MonOpRequestRef op, std::string const &prefix, map &cmdmap, std::stringstream &ss); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index b7bb061ff9ef8..a036ac43cbdbb 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -191,7 +191,8 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, admin_hook(NULL), health_tick_event(NULL), health_interval_event(NULL), - routed_request_tid(0) + routed_request_tid(0), + op_tracker(cct, true, 1) { rank = -1; @@ -204,7 +205,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap"); paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap"); - paxos_service[PAXOS_OSDMAP] = new OSDMonitor(this, paxos, "osdmap"); + paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap"); paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap"); paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm"); paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth"); @@ -1226,8 +1227,9 @@ void Monitor::sync_finish(version_t last_committed) bootstrap(); } -void Monitor::handle_sync(MMonSync *m) +void Monitor::handle_sync(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { @@ -1235,24 +1237,24 @@ void Monitor::handle_sync(MMonSync *m) case MMonSync::OP_GET_COOKIE_FULL: case MMonSync::OP_GET_COOKIE_RECENT: - handle_sync_get_cookie(m); + handle_sync_get_cookie(op); break; case MMonSync::OP_GET_CHUNK: - handle_sync_get_chunk(m); + handle_sync_get_chunk(op); break; // client ----------- case MMonSync::OP_COOKIE: - handle_sync_cookie(m); + handle_sync_cookie(op); break; case MMonSync::OP_CHUNK: case MMonSync::OP_LAST_CHUNK: - handle_sync_chunk(m); + handle_sync_chunk(op); break; case MMonSync::OP_NO_COOKIE: - handle_sync_no_cookie(m); + handle_sync_no_cookie(op); break; default: @@ -1264,16 +1266,18 @@ void Monitor::handle_sync(MMonSync *m) // leader -void Monitor::_sync_reply_no_cookie(MMonSync *m) +void Monitor::_sync_reply_no_cookie(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); m->get_connection()->send_message(reply); } -void Monitor::handle_sync_get_cookie(MMonSync *m) +void Monitor::handle_sync_get_cookie(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); if (is_synchronizing()) { - _sync_reply_no_cookie(m); + _sync_reply_no_cookie(op); return; } @@ -1322,13 +1326,14 @@ void Monitor::handle_sync_get_cookie(MMonSync *m) m->get_connection()->send_message(reply); } -void Monitor::handle_sync_get_chunk(MMonSync *m) +void Monitor::handle_sync_get_chunk(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_providers.count(m->cookie) == 0) { dout(10) << __func__ << " no cookie " << m->cookie << dendl; - _sync_reply_no_cookie(m); + _sync_reply_no_cookie(op); return; } @@ -1342,7 +1347,7 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed << " < our fc " << paxos->get_first_committed() << dendl; sync_providers.erase(m->cookie); - _sync_reply_no_cookie(m); + _sync_reply_no_cookie(op); return; } @@ -1389,8 +1394,9 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) // requester -void Monitor::handle_sync_cookie(MMonSync *m) +void Monitor::handle_sync_cookie(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_cookie) { dout(10) << __func__ << " already have a cookie, ignoring" << dendl; @@ -1422,8 +1428,9 @@ void Monitor::sync_get_next_chunk() assert(g_conf->mon_sync_requester_kill_at != 4); } -void Monitor::handle_sync_chunk(MMonSync *m) +void Monitor::handle_sync_chunk(MonOpRequestRef op) { + MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (m->cookie != sync_cookie) { @@ -1476,7 +1483,7 @@ void Monitor::handle_sync_chunk(MMonSync *m) } } -void Monitor::handle_sync_no_cookie(MMonSync *m) +void Monitor::handle_sync_no_cookie(MonOpRequestRef op) { dout(10) << __func__ << dendl; bootstrap(); @@ -1530,8 +1537,9 @@ void Monitor::probe_timeout(int r) bootstrap(); } -void Monitor::handle_probe(MMonProbe *m) +void Monitor::handle_probe(MonOpRequestRef op) { + MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe " << *m << dendl; if (m->fsid != monmap->fsid) { @@ -1542,11 +1550,11 @@ void Monitor::handle_probe(MMonProbe *m) switch (m->op) { case MMonProbe::OP_PROBE: - handle_probe_probe(m); + handle_probe_probe(op); break; case MMonProbe::OP_REPLY: - handle_probe_reply(m); + handle_probe_reply(op); break; case MMonProbe::OP_MISSING_FEATURES: @@ -1564,8 +1572,9 @@ void Monitor::handle_probe(MMonProbe *m) /** * @todo fix this. This is going to cause trouble. */ -void Monitor::handle_probe_probe(MMonProbe *m) +void Monitor::handle_probe_probe(MonOpRequestRef op) { + MMonProbe *m = static_cast(op->get_req()); MMonProbe *r; dout(10) << "handle_probe_probe " << m->get_source_inst() << *m @@ -1618,8 +1627,9 @@ void Monitor::handle_probe_probe(MMonProbe *m) m->put(); } -void Monitor::handle_probe_reply(MMonProbe *m) +void Monitor::handle_probe_reply(MonOpRequestRef op) { + MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl; dout(10) << " monmap is " << *monmap << dendl; @@ -2542,8 +2552,9 @@ bool Monitor::is_keyring_required() auth_cluster_required == "cephx"; } -void Monitor::handle_command(MMonCommand *m) +void Monitor::handle_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); if (m->fsid != monmap->fsid) { dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl; reply_command(m, -EPERM, "wrong fsid", 0); @@ -2691,16 +2702,16 @@ void Monitor::handle_command(MMonCommand *m) << "cmd=" << m->cmd << ": dispatch"; if (module == "mds" || module == "fs") { - mdsmon()->dispatch(m); + mdsmon()->dispatch(op); return; } if (module == "osd") { - osdmon()->dispatch(m); + osdmon()->dispatch(op); return; } if (module == "pg") { - pgmon()->dispatch(m); + pgmon()->dispatch(op); return; } if (module == "mon" && @@ -2713,20 +2724,20 @@ void Monitor::handle_command(MMonCommand *m) prefix != "mon scrub" && prefix != "mon sync force" && prefix != "mon metadata") { - monmon()->dispatch(m); + monmon()->dispatch(op); return; } if (module == "auth") { - authmon()->dispatch(m); + authmon()->dispatch(op); return; } if (module == "log") { - logmon()->dispatch(m); + logmon()->dispatch(op); return; } if (module == "config-key") { - config_key_service->dispatch(m); + config_key_service->dispatch(op); return; } @@ -2920,7 +2931,7 @@ void Monitor::handle_command(MMonCommand *m) // make sure our map is readable and up to date if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; - waitfor_quorum.push_back(new C_RetryMessage(this, m)); + waitfor_quorum.push_back(new C_RetryMessage(this, op)); return; } _quorum_status(f.get(), ds); @@ -3091,8 +3102,9 @@ struct AnonConnection : public Connection { }; //extract the original message and put it into the regular dispatch function -void Monitor::handle_forward(MForward *m) +void Monitor::handle_forward(MonOpRequestRef op) { + MForward *m = static_cast(op->get_req()); dout(10) << "received forwarded message from " << m->client << " via " << m->get_source_inst() << dendl; MonSession *session = static_cast(m->get_connection()->get_priv()); @@ -3219,8 +3231,9 @@ void Monitor::no_reply(PaxosServiceMessage *req) session->put(); } -void Monitor::handle_route(MRoute *m) +void Monitor::handle_route(MonOpRequestRef op) { + MRoute *m = static_cast(op->get_req()); MonSession *session = static_cast(m->get_connection()->get_priv()); //check privileges if (session && !session->is_capable("mon", MON_CAP_X)) { @@ -3280,7 +3293,8 @@ void Monitor::resend_routed_requests() if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl; req->set_connection(rr->con); - retry.push_back(new C_RetryMessage(this, req)); + MonOpRequestRef op = op_tracker.create_request(req); + retry.push_back(new C_RetryMessage(this, op)); delete rr; } else { dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl; @@ -3336,7 +3350,7 @@ void Monitor::send_command(const entity_inst_t& inst, try_send_message(c, inst); } -void Monitor::waitlist_or_zap_client(Message *m) +void Monitor::waitlist_or_zap_client(MonOpRequestRef op) { /** * Wait list the new session until we're in the quorum, assuming it's @@ -3351,13 +3365,14 @@ void Monitor::waitlist_or_zap_client(Message *m) * 3) command messages. We want to accept these under all possible * circumstances. */ - ConnectionRef con = m->get_connection(); + Message *m = op->get_req(); + ConnectionRef con = op->get_connection(); utime_t too_old = ceph_clock_now(g_ceph_context); too_old -= g_ceph_context->_conf->mon_lease; if (m->get_recv_stamp() > too_old && con->is_connected()) { dout(5) << "waitlisting message " << *m << dendl; - maybe_wait_for_quorum.push_back(new C_RetryMessage(this, m)); + maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op)); } else { dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl; con->mark_down(); @@ -3372,18 +3387,22 @@ void Monitor::_ms_dispatch(Message *m) return; } - ConnectionRef connection = m->get_connection(); + MonOpRequestRef op = op_tracker.create_request(m); + dispatch(op); +} + +void Monitor::dispatch(MonOpRequestRef op) +{ + ConnectionRef connection = op->get_connection(); MonSession *s = NULL; MonCap caps; - bool src_is_mon; + bool src_is_mon = op->is_src_mon(); // regardless of who we are or who the sender is, the message must // have a connection associated. If it doesn't then something fishy // is going on. assert(connection); - src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON); - bool reuse_caps = false; dout(20) << "have connection" << dendl; s = static_cast(connection->get_priv()); @@ -3393,6 +3412,7 @@ void Monitor::_ms_dispatch(Message *m) s->put(); s = NULL; } + Message *m = op->get_req(); if (!s) { // if the sender is not a monitor, make sure their first message for a // session is an MAuth. If it is not, assume it's a stray message, @@ -3403,16 +3423,15 @@ void Monitor::_ms_dispatch(Message *m) m->get_type() != CEPH_MSG_MON_GET_MAP)) { if (m->get_type() == CEPH_MSG_PING) { // let it go through and be dispatched immediately! - return dispatch(s, m, false); + return dispatch_op(op); } dout(1) << __func__ << " dropping stray message " << *m << " from " << m->get_source_inst() << dendl; - m->put(); return; } if (!exited_quorum.is_zero() && !src_is_mon) { - waitlist_or_zap_client(m); + waitlist_or_zap_client(op); return; } @@ -3420,6 +3439,7 @@ void Monitor::_ms_dispatch(Message *m) s = session_map.new_session(m->get_source_inst(), m->get_connection().get()); m->get_connection()->set_priv(s->get()); dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl; + op->set_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_add); @@ -3442,6 +3462,7 @@ void Monitor::_ms_dispatch(Message *m) } else { dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl; } + op->set_session(s); assert(s); if (s->auth_handler) { @@ -3450,31 +3471,29 @@ void Monitor::_ms_dispatch(Message *m) dout(20) << " caps " << s->caps.get_str() << dendl; if (is_synchronizing() && !src_is_mon) { - waitlist_or_zap_client(m); + waitlist_or_zap_client(op); return; } - dispatch(s, m, src_is_mon); + dispatch_op(op); s->put(); return; } -void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) +void Monitor::dispatch_op(MonOpRequestRef op) { - assert(m != NULL); - /* deal with all messages that do not necessarily need caps */ bool dealt_with = true; - switch (m->get_type()) { + switch (op->get_req()->get_type()) { // auth case MSG_MON_GLOBAL_ID: case CEPH_MSG_AUTH: /* no need to check caps here */ - paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_AUTH]->dispatch(op); break; case CEPH_MSG_PING: - handle_ping(static_cast(m)); + handle_ping(op); break; /* MMonGetMap may be used by clients to obtain a monmap *before* @@ -3485,11 +3504,11 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) * not authenticate when obtaining a monmap. */ case CEPH_MSG_MON_GET_MAP: - handle_mon_get_map(static_cast(m)); + handle_mon_get_map(op); break; case CEPH_MSG_MON_METADATA: - return handle_mon_metadata(static_cast(m)); + return handle_mon_metadata(op); default: dealt_with = false; @@ -3500,7 +3519,7 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) /* deal with all messages which caps should be checked somewhere else */ dealt_with = true; - switch (m->get_type()) { + switch (op->get_req()->get_type()) { // OSDs case CEPH_MSG_MON_GET_OSDMAP: @@ -3510,13 +3529,13 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) case MSG_OSD_ALIVE: case MSG_OSD_PGTEMP: case MSG_REMOVE_SNAPS: - paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_OSDMAP]->dispatch(op); break; // MDSs case MSG_MDS_BEACON: case MSG_MDS_OFFLOAD_TARGETS: - paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_MDSMAP]->dispatch(op); break; @@ -3524,21 +3543,21 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) case CEPH_MSG_STATFS: case MSG_PGSTATS: case MSG_GETPOOLSTATS: - paxos_service[PAXOS_PGMAP]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_PGMAP]->dispatch(op); break; case CEPH_MSG_POOLOP: - paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_OSDMAP]->dispatch(op); break; // log case MSG_LOG: - paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_LOG]->dispatch(op); break; // handle_command() does its own caps checking case MSG_MON_COMMAND: - handle_command(static_cast(m)); + handle_command(op); break; default: @@ -3551,24 +3570,24 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) /* messages we, the Monitor class, need to deal with * but may be sent by clients. */ - if (!s->is_capable("mon", MON_CAP_R)) { - dout(5) << __func__ << " " << m->get_source_inst() - << " not enough caps for " << *m << " -- dropping" + if (!op->get_session()->is_capable("mon", MON_CAP_R)) { + dout(5) << __func__ << " " << op->get_req()->get_source_inst() + << " not enough caps for " << *(op->get_req()) << " -- dropping" << dendl; goto drop; } dealt_with = true; - switch (m->get_type()) { + switch (op->get_req()->get_type()) { // misc case CEPH_MSG_MON_GET_VERSION: - handle_get_version(static_cast(m)); + handle_get_version(op); break; case CEPH_MSG_MON_SUBSCRIBE: /* FIXME: check what's being subscribed, filter accordingly */ - handle_subscribe(static_cast(m)); + handle_subscribe(op); break; default: @@ -3578,51 +3597,51 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) if (dealt_with) return; - if (!src_is_mon) { + if (!op->is_src_mon()) { dout(1) << __func__ << " unexpected monitor message from" - << " non-monitor entity " << m->get_source_inst() - << " " << *m << " -- dropping" << dendl; + << " non-monitor entity " << op->get_req()->get_source_inst() + << " " << *(op->get_req()) << " -- dropping" << dendl; goto drop; } /* messages that should only be sent by another monitor */ dealt_with = true; - switch (m->get_type()) { + switch (op->get_req()->get_type()) { case MSG_ROUTE: - handle_route(static_cast(m)); + handle_route(op); break; case MSG_MON_PROBE: - handle_probe(static_cast(m)); + handle_probe(op); break; // Sync (i.e., the new slurp, but on steroids) case MSG_MON_SYNC: - handle_sync(static_cast(m)); + handle_sync(op); break; case MSG_MON_SCRUB: - handle_scrub(static_cast(m)); + handle_scrub(op); break; /* log acks are sent from a monitor we sent the MLog to, and are never sent by clients to us. */ case MSG_LOGACK: - log_client.handle_log_ack((MLogAck*)m); - m->put(); + log_client.handle_log_ack((MLogAck*)op->get_req()); + //m->put(); break; // monmap case MSG_MON_JOIN: - paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m); + paxos_service[PAXOS_MONMAP]->dispatch(op); break; // paxos case MSG_MON_PAXOS: { - MMonPaxos *pm = static_cast(m); - if (!src_is_mon || - !s->is_capable("mon", MON_CAP_X)) { + MMonPaxos *pm = static_cast(op->get_req()); + if (!op->is_src_mon() || + !op->get_session()->is_capable("mon", MON_CAP_X)) { //can't send these! pm->put(); break; @@ -3648,37 +3667,35 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) break; } - paxos->dispatch((PaxosServiceMessage*)m); + paxos->dispatch(op); } break; // elector messages case MSG_MON_ELECTION: //check privileges here for simplicity - if (s && - !s->is_capable("mon", MON_CAP_X)) { + if (op->get_session() && + !op->get_session()->is_capable("mon", MON_CAP_X)) { dout(0) << "MMonElection received from entity without enough caps!" - << s->caps << dendl; - m->put(); + << op->get_session()->caps << dendl; + //m->put(); break; } if (!is_probing() && !is_synchronizing()) { - elector.dispatch(m); - } else { - m->put(); + elector.dispatch(op); } break; case MSG_FORWARD: - handle_forward(static_cast(m)); + handle_forward(op); break; case MSG_TIMECHECK: - handle_timecheck(static_cast(m)); + handle_timecheck(op); break; case MSG_MON_HEALTH: - health_monitor->dispatch(static_cast(m)); + health_monitor->dispatch(op); break; default: @@ -3686,17 +3703,19 @@ void Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) break; } if (!dealt_with) { - dout(1) << "dropping unexpected " << *m << dendl; + dout(1) << "dropping unexpected " << *(op->get_req()) << dendl; goto drop; } return; drop: - m->put(); + //m->put(); + return; } -void Monitor::handle_ping(MPing *m) +void Monitor::handle_ping(MonOpRequestRef op) { + MPing *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; MPing *reply = new MPing; entity_inst_t inst = m->get_source_inst(); @@ -3908,8 +3927,9 @@ health_status_t Monitor::timecheck_status(ostringstream &ss, return status; } -void Monitor::handle_timecheck_leader(MTimeCheck *m) +void Monitor::handle_timecheck_leader(MonOpRequestRef op) { + MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; /* handles PONG's */ assert(m->op == MTimeCheck::OP_PONG); @@ -4030,8 +4050,9 @@ void Monitor::handle_timecheck_leader(MTimeCheck *m) } } -void Monitor::handle_timecheck_peon(MTimeCheck *m) +void Monitor::handle_timecheck_peon(MonOpRequestRef op) { + MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; assert(is_peon()); @@ -4071,21 +4092,22 @@ void Monitor::handle_timecheck_peon(MTimeCheck *m) m->get_connection()->send_message(reply); } -void Monitor::handle_timecheck(MTimeCheck *m) +void Monitor::handle_timecheck(MonOpRequestRef op) { + MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (is_leader()) { if (m->op != MTimeCheck::OP_PONG) { dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl; } else { - handle_timecheck_leader(m); + handle_timecheck_leader(op); } } else if (is_peon()) { if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) { dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl; } else { - handle_timecheck_peon(m); + handle_timecheck_peon(op); } } else { dout(1) << __func__ << " drop unexpected msg" << dendl; @@ -4093,8 +4115,9 @@ void Monitor::handle_timecheck(MTimeCheck *m) m->put(); } -void Monitor::handle_subscribe(MMonSubscribe *m) +void Monitor::handle_subscribe(MonOpRequestRef op) { + MMonSubscribe *m = static_cast(op->get_req()); dout(10) << "handle_subscribe " << *m << dendl; bool reply = false; @@ -4147,8 +4170,9 @@ void Monitor::handle_subscribe(MMonSubscribe *m) m->put(); } -void Monitor::handle_get_version(MMonGetVersion *m) +void Monitor::handle_get_version(MonOpRequestRef op) { + MMonGetVersion *m = static_cast(op->get_req()); dout(10) << "handle_get_version " << *m << dendl; PaxosService *svc = NULL; @@ -4161,7 +4185,7 @@ void Monitor::handle_get_version(MMonGetVersion *m) if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; - waitfor_quorum.push_back(new C_RetryMessage(this, m)); + waitfor_quorum.push_back(new C_RetryMessage(this, op)); goto out; } @@ -4177,7 +4201,7 @@ void Monitor::handle_get_version(MMonGetVersion *m) if (svc) { if (!svc->is_readable()) { - svc->wait_for_readable(new C_RetryMessage(this, m)); + svc->wait_for_readable(new C_RetryMessage(this, op)); goto out; } @@ -4258,15 +4282,17 @@ void Monitor::send_latest_monmap(Connection *con) con->send_message(new MMonMap(bl)); } -void Monitor::handle_mon_get_map(MMonGetMap *m) +void Monitor::handle_mon_get_map(MonOpRequestRef op) { + MMonGetMap *m = static_cast(op->get_req()); dout(10) << "handle_mon_get_map" << dendl; send_latest_monmap(m->get_connection().get()); m->put(); } -void Monitor::handle_mon_metadata(MMonMetadata *m) +void Monitor::handle_mon_metadata(MonOpRequestRef op) { + MMonMetadata *m = static_cast(op->get_req()); if (is_leader()) { dout(10) << __func__ << dendl; update_mon_metadata(m->get_source().num(), m->data); @@ -4419,8 +4445,9 @@ int Monitor::scrub() return 0; } -void Monitor::handle_scrub(MMonScrub *m) +void Monitor::handle_scrub(MonOpRequestRef op) { + MMonScrub *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { case MMonScrub::OP_SCRUB: diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 28cd11df74cf0..5e26a2fc1be3f 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -54,6 +54,9 @@ #include "include/str_map.h" #include +#include "common/TrackedOp.h" +#include "mon/MonOpRequest.h" + #define CEPH_MON_PROTOCOL 13 /* cluster internal */ @@ -245,7 +248,7 @@ private: */ int scrub_start(); int scrub(); - void handle_scrub(MMonScrub *m); + void handle_scrub(MonOpRequestRef op); bool _scrub(ScrubResult *r, pair *start, int *num_keys); @@ -447,18 +450,18 @@ private: * * @param m Sync message with operation type MMonSync::OP_START_CHUNKS */ - void handle_sync(MMonSync *m); + void handle_sync(MonOpRequestRef op); - void _sync_reply_no_cookie(MMonSync *m); + void _sync_reply_no_cookie(MonOpRequestRef op); - void handle_sync_get_cookie(MMonSync *m); - void handle_sync_get_chunk(MMonSync *m); - void handle_sync_finish(MMonSync *m); + void handle_sync_get_cookie(MonOpRequestRef op); + void handle_sync_get_chunk(MonOpRequestRef op); + void handle_sync_finish(MonOpRequestRef op); - void handle_sync_cookie(MMonSync *m); - void handle_sync_forward(MMonSync *m); - void handle_sync_chunk(MMonSync *m); - void handle_sync_no_cookie(MMonSync *m); + void handle_sync_cookie(MonOpRequestRef op); + void handle_sync_forward(MonOpRequestRef op); + void handle_sync_chunk(MonOpRequestRef op); + void handle_sync_no_cookie(MonOpRequestRef op); /** * @} // Synchronization @@ -523,9 +526,9 @@ private: health_status_t timecheck_status(ostringstream &ss, const double skew_bound, const double latency); - void handle_timecheck_leader(MTimeCheck *m); - void handle_timecheck_peon(MTimeCheck *m); - void handle_timecheck(MTimeCheck *m); + void handle_timecheck_leader(MonOpRequestRef op); + void handle_timecheck_peon(MonOpRequestRef op); + void handle_timecheck(MonOpRequestRef op); /** * @} */ @@ -556,7 +559,7 @@ private: /** * Handle ping messages from others. */ - void handle_ping(MPing *m); + void handle_ping(MonOpRequestRef op); Context *probe_timeout_event; // for probing @@ -671,9 +674,9 @@ public: void send_latest_monmap(Connection *con); // messages - void handle_get_version(MMonGetVersion *m); - void handle_subscribe(MMonSubscribe *m); - void handle_mon_get_map(MMonGetMap *m); + void handle_get_version(MonOpRequestRef op); + void handle_subscribe(MonOpRequestRef op); + void handle_mon_get_map(MonOpRequestRef op); static void _generate_command_map(map& cmdmap, map ¶m_str_map); @@ -686,10 +689,10 @@ public: void get_mon_status(Formatter *f, ostream& ss); void _quorum_status(Formatter *f, ostream& ss); bool _add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss); - void handle_command(class MMonCommand *m); - void handle_route(MRoute *m); + void handle_command(MonOpRequestRef op); + void handle_route(MonOpRequestRef op); - void handle_mon_metadata(MMonMetadata *m); + void handle_mon_metadata(MonOpRequestRef op); int get_mon_metadata(int mon, Formatter *f, ostream& err); int print_nodes(Formatter *f, ostream& err); map metadata; @@ -760,7 +763,7 @@ public: void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version); - void handle_probe(MMonProbe *m); + void handle_probe(MonOpRequestRef op); /** * Handle a Probe Operation, replying with our name, quorum and known versions. * @@ -775,8 +778,8 @@ public: * * @param m A Probe message, with an operation of type Probe. */ - void handle_probe_probe(MMonProbe *m); - void handle_probe_reply(MMonProbe *m); + void handle_probe_probe(MonOpRequestRef op); + void handle_probe_reply(MonOpRequestRef op); // request routing struct RoutedRequest { @@ -797,14 +800,14 @@ public: map routed_requests; void forward_request_leader(PaxosServiceMessage *req); - void handle_forward(MForward *m); + void handle_forward(MonOpRequestRef op); void try_send_message(Message *m, const entity_inst_t& to); void send_reply(PaxosServiceMessage *req, Message *reply); void no_reply(PaxosServiceMessage *req); void resend_routed_requests(); void remove_session(MonSession *s); void remove_all_sessions(); - void waitlist_or_zap_client(Message *m); + void waitlist_or_zap_client(MonOpRequestRef op); void send_command(const entity_inst_t& inst, const vector& com); @@ -812,19 +815,20 @@ public: public: struct C_Command : public Context { Monitor *mon; - MMonCommand *m; + MonOpRequestRef op; int rc; string rs; bufferlist rdata; version_t version; - C_Command(Monitor *_mm, MMonCommand *_m, int r, string s, version_t v) : - mon(_mm), m(_m), rc(r), rs(s), version(v){} - C_Command(Monitor *_mm, MMonCommand *_m, int r, string s, bufferlist rd, version_t v) : - mon(_mm), m(_m), rc(r), rs(s), rdata(rd), version(v){} + C_Command(Monitor *_mm, MonOpRequestRef _op, int r, string s, version_t v) : + mon(_mm), op(_op), rc(r), rs(s), version(v){} + C_Command(Monitor *_mm, MonOpRequestRef _op, int r, string s, bufferlist rd, version_t v) : + mon(_mm), op(_op), rc(r), rs(s), rdata(rd), version(v){} void finish(int r) { + MMonCommand *m = static_cast(op->get_req()); if (r >= 0) { ostringstream ss; - if (!m->get_connection()) { + if (!op->get_req()->get_connection()) { ss << "connection dropped for command "; } else { MonSession *s = m->get_session(); @@ -845,7 +849,7 @@ public: else if (r == -ECANCELED) m->put(); else if (r == -EAGAIN) - mon->_ms_dispatch(m); + mon->dispatch_op(op); else assert(0 == "bad C_Command return value"); } @@ -854,14 +858,15 @@ public: private: class C_RetryMessage : public Context { Monitor *mon; + MonOpRequestRef op; Message *msg; public: - C_RetryMessage(Monitor *m, Message *ms) : mon(m), msg(ms) {} + C_RetryMessage(Monitor *m, MonOpRequestRef o) : mon(m), op(o) {} void finish(int r) { if (r == -EAGAIN || r >= 0) - mon->_ms_dispatch(msg); + mon->dispatch_op(op); else if (r == -ECANCELED) - msg->put(); + return; else assert(0 == "bad C_RetryMessage return value"); } @@ -877,7 +882,8 @@ public: return true; } // dissociate message handling from session and connection logic - void dispatch(MonSession *s, Message *m, const bool src_is_mon); + void dispatch(MonOpRequestRef op); + void dispatch_op(MonOpRequestRef op); //mon_caps is used for un-connected messages from monitors MonCap * mon_caps; bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); @@ -902,6 +908,8 @@ public: void read_features(); void write_features(MonitorDBStore::TransactionRef t); + OpTracker op_tracker; + public: Monitor(CephContext *cct_, string nm, MonitorDBStore *s, Messenger *m, MonMap *map); diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc index c6b21cb47fc7c..c204447361e94 100644 --- a/src/mon/MonmapMonitor.cc +++ b/src/mon/MonmapMonitor.cc @@ -126,14 +126,15 @@ void MonmapMonitor::on_active() mon->clog->info() << "monmap " << *mon->monmap << "\n"; } -bool MonmapMonitor::preprocess_query(PaxosServiceMessage *m) +bool MonmapMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); switch (m->get_type()) { // READs case MSG_MON_COMMAND: - return preprocess_command(static_cast(m)); + return preprocess_command(op); case MSG_MON_JOIN: - return preprocess_join(static_cast(m)); + return preprocess_join(op); default: assert(0); m->put(); @@ -154,8 +155,9 @@ void MonmapMonitor::dump_info(Formatter *f) f->close_section(); } -bool MonmapMonitor::preprocess_command(MMonCommand *m) +bool MonmapMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -1; bufferlist rdata; stringstream ss; @@ -256,15 +258,16 @@ reply: } -bool MonmapMonitor::prepare_update(PaxosServiceMessage *m) +bool MonmapMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_MON_COMMAND: - return prepare_command(static_cast(m)); + return prepare_command(op); case MSG_MON_JOIN: - return prepare_join(static_cast(m)); + return prepare_join(op); default: assert(0); m->put(); @@ -273,8 +276,9 @@ bool MonmapMonitor::prepare_update(PaxosServiceMessage *m) return false; } -bool MonmapMonitor::prepare_command(MMonCommand *m) +bool MonmapMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); stringstream ss; string rs; int err = -EINVAL; @@ -351,7 +355,7 @@ bool MonmapMonitor::prepare_command(MMonCommand *m) pending_map.add(name, addr); pending_map.last_changed = ceph_clock_now(g_ceph_context); getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -387,8 +391,9 @@ out: return false; } -bool MonmapMonitor::preprocess_join(MMonJoin *join) +bool MonmapMonitor::preprocess_join(MonOpRequestRef op) { + MMonJoin *join = static_cast(op->get_req()); dout(10) << "preprocess_join " << join->name << " at " << join->addr << dendl; MonSession *session = join->get_session(); @@ -411,8 +416,9 @@ bool MonmapMonitor::preprocess_join(MMonJoin *join) } return false; } -bool MonmapMonitor::prepare_join(MMonJoin *join) +bool MonmapMonitor::prepare_join(MonOpRequestRef op) { + MMonJoin *join = static_cast(op->get_req()); dout(0) << "adding/updating " << join->name << " at " << join->addr << " to monitor cluster" << dendl; if (pending_map.contains(join->name)) pending_map.remove(join->name); diff --git a/src/mon/MonmapMonitor.h b/src/mon/MonmapMonitor.h index 22b51ad64558e..f55409217b5b7 100644 --- a/src/mon/MonmapMonitor.h +++ b/src/mon/MonmapMonitor.h @@ -58,14 +58,14 @@ class MonmapMonitor : public PaxosService { void dump_info(Formatter *f); - bool preprocess_query(PaxosServiceMessage *m); - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); + bool prepare_update(MonOpRequestRef op); - bool preprocess_join(MMonJoin *m); - bool prepare_join(MMonJoin *m); + bool preprocess_join(MonOpRequestRef op); + bool prepare_join(MonOpRequestRef op); - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); void get_health(list >& summary, list > *detail) const; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index e79fe090a3fe1..a0abbee38b9a0 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -434,7 +434,9 @@ void OSDMonitor::on_active() list ls; take_all_failures(ls); while (!ls.empty()) { - dispatch(ls.front()); + MonOpRequestRef op = + mon->op_tracker.create_request(ls.front()); + dispatch(op); ls.pop_front(); } } @@ -1251,34 +1253,35 @@ void OSDMonitor::encode_trim_extra(MonitorDBStore::TransactionRef tx, // ------------- -bool OSDMonitor::preprocess_query(PaxosServiceMessage *m) +bool OSDMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { // READs case MSG_MON_COMMAND: - return preprocess_command(static_cast(m)); + return preprocess_command(op); case CEPH_MSG_MON_GET_OSDMAP: - return preprocess_get_osdmap(static_cast(m)); + return preprocess_get_osdmap(op); // damp updates case MSG_OSD_MARK_ME_DOWN: - return preprocess_mark_me_down(static_cast(m)); + return preprocess_mark_me_down(op); case MSG_OSD_FAILURE: - return preprocess_failure(static_cast(m)); + return preprocess_failure(op); case MSG_OSD_BOOT: - return preprocess_boot(static_cast(m)); + return preprocess_boot(op); case MSG_OSD_ALIVE: - return preprocess_alive(static_cast(m)); + return preprocess_alive(op); case MSG_OSD_PGTEMP: - return preprocess_pgtemp(static_cast(m)); + return preprocess_pgtemp(op); case CEPH_MSG_POOLOP: - return preprocess_pool_op(static_cast(m)); + return preprocess_pool_op(op); case MSG_REMOVE_SNAPS: - return preprocess_remove_snaps(static_cast(m)); + return preprocess_remove_snaps(op); default: assert(0); @@ -1287,31 +1290,32 @@ bool OSDMonitor::preprocess_query(PaxosServiceMessage *m) } } -bool OSDMonitor::prepare_update(PaxosServiceMessage *m) +bool OSDMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { // damp updates case MSG_OSD_MARK_ME_DOWN: - return prepare_mark_me_down(static_cast(m)); + return prepare_mark_me_down(op); case MSG_OSD_FAILURE: - return prepare_failure(static_cast(m)); + return prepare_failure(op); case MSG_OSD_BOOT: - return prepare_boot(static_cast(m)); + return prepare_boot(op); case MSG_OSD_ALIVE: - return prepare_alive(static_cast(m)); + return prepare_alive(op); case MSG_OSD_PGTEMP: - return prepare_pgtemp(static_cast(m)); + return prepare_pgtemp(op); case MSG_MON_COMMAND: - return prepare_command(static_cast(m)); + return prepare_command(op); case CEPH_MSG_POOLOP: - return prepare_pool_op(static_cast(m)); + return prepare_pool_op(op); case MSG_REMOVE_SNAPS: - return prepare_remove_snaps(static_cast(m)); + return prepare_remove_snaps(op); default: assert(0); @@ -1347,8 +1351,9 @@ bool OSDMonitor::should_propose(double& delay) // --------------------------- // READs -bool OSDMonitor::preprocess_get_osdmap(MMonGetOSDMap *m) +bool OSDMonitor::preprocess_get_osdmap(MonOpRequestRef op) { + MMonGetOSDMap *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; MOSDMap *reply = new MOSDMap(mon->monmap->fsid); epoch_t first = get_first_committed(); @@ -1398,8 +1403,9 @@ bool OSDMonitor::check_source(PaxosServiceMessage *m, uuid_d fsid) { } -bool OSDMonitor::preprocess_failure(MOSDFailure *m) +bool OSDMonitor::preprocess_failure(MonOpRequestRef op) { + MOSDFailure *m = static_cast(op->get_req()); // who is target_osd int badboy = m->get_target().name.num(); @@ -1459,14 +1465,15 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) class C_AckMarkedDown : public Context { OSDMonitor *osdmon; - MOSDMarkMeDown *m; + MonOpRequestRef op; public: C_AckMarkedDown( OSDMonitor *osdmon, - MOSDMarkMeDown *m) - : osdmon(osdmon), m(m) {} + MonOpRequestRef op) + : osdmon(osdmon), op(op) {} void finish(int) { + MOSDMarkMeDown *m = static_cast(op->get_req()); osdmon->mon->send_reply( m, new MOSDMarkMeDown( @@ -1480,8 +1487,9 @@ public: } }; -bool OSDMonitor::preprocess_mark_me_down(MOSDMarkMeDown *m) +bool OSDMonitor::preprocess_mark_me_down(MonOpRequestRef op) { + MOSDMarkMeDown *m = static_cast(op->get_req()); int requesting_down = m->get_target().name.num(); int from = m->get_orig_source().num(); @@ -1511,14 +1519,15 @@ bool OSDMonitor::preprocess_mark_me_down(MOSDMarkMeDown *m) reply: if (m->request_ack) { - Context *c(new C_AckMarkedDown(this, m)); + Context *c(new C_AckMarkedDown(this, op)); c->complete(0); } return true; } -bool OSDMonitor::prepare_mark_me_down(MOSDMarkMeDown *m) +bool OSDMonitor::prepare_mark_me_down(MonOpRequestRef op) { + MOSDMarkMeDown *m = static_cast(op->get_req()); int target_osd = m->get_target().name.num(); assert(osdmap.is_up(target_osd)); @@ -1527,7 +1536,7 @@ bool OSDMonitor::prepare_mark_me_down(MOSDMarkMeDown *m) mon->clog->info() << "osd." << target_osd << " marked itself down\n"; pending_inc.new_state[target_osd] = CEPH_OSD_UP; if (m->request_ack) - wait_for_finished_proposal(new C_AckMarkedDown(this, m)); + wait_for_finished_proposal(new C_AckMarkedDown(this, op)); return true; } @@ -1678,8 +1687,9 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi) return false; } -bool OSDMonitor::prepare_failure(MOSDFailure *m) +bool OSDMonitor::prepare_failure(MonOpRequestRef op) { + MOSDFailure *m = static_cast(op->get_req()); dout(1) << "prepare_failure " << m->get_target() << " from " << m->get_orig_source_inst() << " is reporting failure:" << m->if_osd_failed() << dendl; @@ -1771,8 +1781,9 @@ void OSDMonitor::take_all_failures(list& ls) // boot -- -bool OSDMonitor::preprocess_boot(MOSDBoot *m) +bool OSDMonitor::preprocess_boot(MonOpRequestRef op) { + MOSDBoot *m = static_cast(op->get_req()); int from = m->get_orig_source_inst().name.num(); // check permissions, ignore if failed (no response expected) @@ -1823,7 +1834,7 @@ bool OSDMonitor::preprocess_boot(MOSDBoot *m) // yup. dout(7) << "preprocess_boot dup from " << m->get_orig_source_inst() << " == " << osdmap.get_inst(from) << dendl; - _booted(m, false); + _booted(op, false); return true; } @@ -1859,8 +1870,9 @@ bool OSDMonitor::preprocess_boot(MOSDBoot *m) return true; } -bool OSDMonitor::prepare_boot(MOSDBoot *m) +bool OSDMonitor::prepare_boot(MonOpRequestRef op) { + MOSDBoot *m = static_cast(op->get_req()); dout(7) << "prepare_boot from " << m->get_orig_source_inst() << " sb " << m->sb << " cluster_addr " << m->cluster_addr << " hb_back_addr " << m->hb_back_addr @@ -1893,11 +1905,11 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) // mark previous guy down pending_inc.new_state[from] = CEPH_OSD_UP; } - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); } else if (pending_inc.new_up_client.count(from)) { //FIXME: should this be using new_up_client? // already prepared, just wait dout(7) << "prepare_boot already prepared, waiting on " << m->get_orig_source_addr() << dendl; - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); } else { // mark new guy up. pending_inc.new_up_client[from] = m->get_orig_source_addr(); @@ -1991,14 +2003,15 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) pending_inc.new_xinfo[from] = xi; // wait - wait_for_finished_proposal(new C_Booted(this, m)); + wait_for_finished_proposal(new C_Booted(this, op)); } return true; } -void OSDMonitor::_booted(MOSDBoot *m, bool logit) +void OSDMonitor::_booted(MonOpRequestRef op, bool logit) { - dout(7) << "_booted " << m->get_orig_source_inst() + MOSDBoot *m = static_cast(op->get_req()); + dout(7) << "_booted " << m->get_orig_source_inst() << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl; if (logit) { @@ -2012,8 +2025,9 @@ void OSDMonitor::_booted(MOSDBoot *m, bool logit) // ------------- // alive -bool OSDMonitor::preprocess_alive(MOSDAlive *m) +bool OSDMonitor::preprocess_alive(MonOpRequestRef op) { + MOSDAlive *m = static_cast(op->get_req()); int from = m->get_orig_source().num(); // check permissions, ignore if failed @@ -2035,7 +2049,7 @@ bool OSDMonitor::preprocess_alive(MOSDAlive *m) if (osdmap.get_up_thru(from) >= m->want) { // yup. dout(7) << "preprocess_alive want up_thru " << m->want << " dup from " << m->get_orig_source_inst() << dendl; - _reply_map(m, m->version); + _reply_map(op, m->version); return true; } @@ -2048,8 +2062,9 @@ bool OSDMonitor::preprocess_alive(MOSDAlive *m) return true; } -bool OSDMonitor::prepare_alive(MOSDAlive *m) +bool OSDMonitor::prepare_alive(MonOpRequestRef op) { + MOSDAlive *m = static_cast(op->get_req()); int from = m->get_orig_source().num(); if (0) { // we probably don't care much about these @@ -2059,12 +2074,13 @@ bool OSDMonitor::prepare_alive(MOSDAlive *m) dout(7) << "prepare_alive want up_thru " << m->want << " have " << m->version << " from " << m->get_orig_source_inst() << dendl; pending_inc.new_up_thru[from] = m->version; // set to the latest map the OSD has - wait_for_finished_proposal(new C_ReplyMap(this, m, m->version)); + wait_for_finished_proposal(new C_ReplyMap(this, op, m->version)); return true; } -void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e) +void OSDMonitor::_reply_map(MonOpRequestRef op, epoch_t e) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(7) << "_reply_map " << e << " from " << m->get_orig_source_inst() << dendl; @@ -2074,8 +2090,9 @@ void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e) // ------------- // pg_temp changes -bool OSDMonitor::preprocess_pgtemp(MOSDPGTemp *m) +bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op) { + MOSDPGTemp *m = static_cast(op->get_req()); dout(10) << "preprocess_pgtemp " << *m << dendl; vector empty; int from = m->get_orig_source().num(); @@ -2137,7 +2154,7 @@ bool OSDMonitor::preprocess_pgtemp(MOSDPGTemp *m) goto ignore; dout(7) << "preprocess_pgtemp e" << m->map_epoch << " no changes from " << m->get_orig_source_inst() << dendl; - _reply_map(m, m->map_epoch); + _reply_map(op, m->map_epoch); return true; ignore: @@ -2145,8 +2162,9 @@ bool OSDMonitor::preprocess_pgtemp(MOSDPGTemp *m) return true; } -bool OSDMonitor::prepare_pgtemp(MOSDPGTemp *m) +bool OSDMonitor::prepare_pgtemp(MonOpRequestRef op) { + MOSDPGTemp *m = static_cast(op->get_req()); int from = m->get_orig_source().num(); dout(7) << "prepare_pgtemp e" << m->map_epoch << " from " << m->get_orig_source_inst() << dendl; for (map >::iterator p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) { @@ -2171,15 +2189,16 @@ bool OSDMonitor::prepare_pgtemp(MOSDPGTemp *m) pending_inc.new_primary_temp[p->first] = -1; } pending_inc.new_up_thru[from] = m->map_epoch; // set up_thru too, so the osd doesn't have to ask again - wait_for_finished_proposal(new C_ReplyMap(this, m, m->map_epoch)); + wait_for_finished_proposal(new C_ReplyMap(this, op, m->map_epoch)); return true; } // --- -bool OSDMonitor::preprocess_remove_snaps(MRemoveSnaps *m) +bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op) { + MRemoveSnaps *m = static_cast(op->get_req()); dout(7) << "preprocess_remove_snaps " << *m << dendl; // check privilege, ignore if failed @@ -2214,8 +2233,9 @@ bool OSDMonitor::preprocess_remove_snaps(MRemoveSnaps *m) return true; } -bool OSDMonitor::prepare_remove_snaps(MRemoveSnaps *m) +bool OSDMonitor::prepare_remove_snaps(MonOpRequestRef op) { + MRemoveSnaps *m = static_cast(op->get_req()); dout(7) << "prepare_remove_snaps " << *m << dendl; for (map >::iterator p = m->snaps.begin(); @@ -2842,8 +2862,9 @@ namespace { } -bool OSDMonitor::preprocess_command(MMonCommand *m) +bool OSDMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = 0; bufferlist rdata; stringstream ss, ds; @@ -3928,8 +3949,9 @@ void OSDMonitor::get_pools_health( } -int OSDMonitor::prepare_new_pool(MPoolOp *m) +int OSDMonitor::prepare_new_pool(MonOpRequestRef op) { + MPoolOp *m = static_cast(op->get_req()); dout(10) << "prepare_new_pool from " << m->get_connection() << dendl; MonSession *session = m->get_session(); if (!session) @@ -4419,26 +4441,28 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid, return 0; } -bool OSDMonitor::prepare_set_flag(MMonCommand *m, int flag) +bool OSDMonitor::prepare_set_flag(MonOpRequestRef op, int flag) { + MMonCommand *m = static_cast(op->get_req()); ostringstream ss; if (pending_inc.new_flags < 0) pending_inc.new_flags = osdmap.get_flags(); pending_inc.new_flags |= flag; ss << "set " << OSDMap::get_flag_string(flag); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } -bool OSDMonitor::prepare_unset_flag(MMonCommand *m, int flag) +bool OSDMonitor::prepare_unset_flag(MonOpRequestRef op, int flag) { + MMonCommand *m = static_cast(op->get_req()); ostringstream ss; if (pending_inc.new_flags < 0) pending_inc.new_flags = osdmap.get_flags(); pending_inc.new_flags &= ~flag; ss << "unset " << OSDMap::get_flag_string(flag); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } @@ -4824,8 +4848,9 @@ int OSDMonitor::prepare_command_pool_set(map &cmdmap, return 0; } -bool OSDMonitor::prepare_command(MMonCommand *m) +bool OSDMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); stringstream ss; map cmdmap; if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { @@ -4840,12 +4865,13 @@ bool OSDMonitor::prepare_command(MMonCommand *m) return true; } - return prepare_command_impl(m, cmdmap); + return prepare_command_impl(op, cmdmap); } -bool OSDMonitor::prepare_command_impl(MMonCommand *m, +bool OSDMonitor::prepare_command_impl(MonOpRequestRef op, map &cmdmap) { + MMonCommand *m = static_cast(op->get_req()); bool ret = false; stringstream ss; string rs; @@ -5069,7 +5095,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, ss << action << " item id " << osdid << " name '" << name << "' weight " << weight << " at location " << loc << " to crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5114,7 +5140,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, ss << "create-or-move updating item name '" << name << "' weight " << weight << " at location " << loc << " to crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -5149,7 +5175,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, pending_inc.crush.clear(); newcrush.encode(pending_inc.crush); getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -5211,7 +5237,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, err = 0; } } - wait_for_finished_proposal(new Monitor::C_Command(mon, m, err, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, err, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd crush rm" || @@ -5234,7 +5260,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, err = 0; ss << "device '" << name << "' does not appear in the crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -5264,7 +5290,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); ss << "removed item id " << id << " name '" << name << "' from crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -5280,7 +5306,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); ss << "reweighted crush hierarchy"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd crush reweight") { @@ -5318,7 +5344,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, ss << "reweighted item id " << id << " name '" << name << "' to " << w << " in crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd crush reweight-subtree") { @@ -5356,7 +5382,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, ss << "reweighted subtree id " << id << " name '" << name << "' to " << w << " in crush map"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd crush tunables") { @@ -5393,7 +5419,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); ss << "adjusted tunables profile to " << profile; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd crush set-tunable") { @@ -5433,7 +5459,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); ss << "adjusted tunable " << tunable << " to " << value; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5474,7 +5500,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5500,7 +5526,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else { @@ -5581,7 +5607,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5642,7 +5668,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5685,7 +5711,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, newcrush.encode(pending_inc.crush); } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -5725,43 +5751,43 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, pending_inc.new_max_osd = newmax; ss << "set new max_osd = " << pending_inc.new_max_osd; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd pause") { - return prepare_set_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); + return prepare_set_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); } else if (prefix == "osd unpause") { - return prepare_unset_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); + return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); } else if (prefix == "osd set") { string key; cmd_getval(g_ceph_context, cmdmap, "key", key); if (key == "full") - return prepare_set_flag(m, CEPH_OSDMAP_FULL); + return prepare_set_flag(op, CEPH_OSDMAP_FULL); else if (key == "pause") - return prepare_set_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); + return prepare_set_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); else if (key == "noup") - return prepare_set_flag(m, CEPH_OSDMAP_NOUP); + return prepare_set_flag(op, CEPH_OSDMAP_NOUP); else if (key == "nodown") - return prepare_set_flag(m, CEPH_OSDMAP_NODOWN); + return prepare_set_flag(op, CEPH_OSDMAP_NODOWN); else if (key == "noout") - return prepare_set_flag(m, CEPH_OSDMAP_NOOUT); + return prepare_set_flag(op, CEPH_OSDMAP_NOOUT); else if (key == "noin") - return prepare_set_flag(m, CEPH_OSDMAP_NOIN); + return prepare_set_flag(op, CEPH_OSDMAP_NOIN); else if (key == "nobackfill") - return prepare_set_flag(m, CEPH_OSDMAP_NOBACKFILL); + return prepare_set_flag(op, CEPH_OSDMAP_NOBACKFILL); else if (key == "norebalance") - return prepare_set_flag(m, CEPH_OSDMAP_NOREBALANCE); + return prepare_set_flag(op, CEPH_OSDMAP_NOREBALANCE); else if (key == "norecover") - return prepare_set_flag(m, CEPH_OSDMAP_NORECOVER); + return prepare_set_flag(op, CEPH_OSDMAP_NORECOVER); else if (key == "noscrub") - return prepare_set_flag(m, CEPH_OSDMAP_NOSCRUB); + return prepare_set_flag(op, CEPH_OSDMAP_NOSCRUB); else if (key == "nodeep-scrub") - return prepare_set_flag(m, CEPH_OSDMAP_NODEEP_SCRUB); + return prepare_set_flag(op, CEPH_OSDMAP_NODEEP_SCRUB); else if (key == "notieragent") - return prepare_set_flag(m, CEPH_OSDMAP_NOTIERAGENT); + return prepare_set_flag(op, CEPH_OSDMAP_NOTIERAGENT); else { ss << "unrecognized flag '" << key << "'"; err = -EINVAL; @@ -5771,29 +5797,29 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, string key; cmd_getval(g_ceph_context, cmdmap, "key", key); if (key == "full") - return prepare_unset_flag(m, CEPH_OSDMAP_FULL); + return prepare_unset_flag(op, CEPH_OSDMAP_FULL); else if (key == "pause") - return prepare_unset_flag(m, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); + return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR); else if (key == "noup") - return prepare_unset_flag(m, CEPH_OSDMAP_NOUP); + return prepare_unset_flag(op, CEPH_OSDMAP_NOUP); else if (key == "nodown") - return prepare_unset_flag(m, CEPH_OSDMAP_NODOWN); + return prepare_unset_flag(op, CEPH_OSDMAP_NODOWN); else if (key == "noout") - return prepare_unset_flag(m, CEPH_OSDMAP_NOOUT); + return prepare_unset_flag(op, CEPH_OSDMAP_NOOUT); else if (key == "noin") - return prepare_unset_flag(m, CEPH_OSDMAP_NOIN); + return prepare_unset_flag(op, CEPH_OSDMAP_NOIN); else if (key == "nobackfill") - return prepare_unset_flag(m, CEPH_OSDMAP_NOBACKFILL); + return prepare_unset_flag(op, CEPH_OSDMAP_NOBACKFILL); else if (key == "norebalance") - return prepare_unset_flag(m, CEPH_OSDMAP_NOREBALANCE); + return prepare_unset_flag(op, CEPH_OSDMAP_NOREBALANCE); else if (key == "norecover") - return prepare_unset_flag(m, CEPH_OSDMAP_NORECOVER); + return prepare_unset_flag(op, CEPH_OSDMAP_NORECOVER); else if (key == "noscrub") - return prepare_unset_flag(m, CEPH_OSDMAP_NOSCRUB); + return prepare_unset_flag(op, CEPH_OSDMAP_NOSCRUB); else if (key == "nodeep-scrub") - return prepare_unset_flag(m, CEPH_OSDMAP_NODEEP_SCRUB); + return prepare_unset_flag(op, CEPH_OSDMAP_NODEEP_SCRUB); else if (key == "notieragent") - return prepare_unset_flag(m, CEPH_OSDMAP_NOTIERAGENT); + return prepare_unset_flag(op, CEPH_OSDMAP_NOTIERAGENT); else { ss << "unrecognized flag '" << key << "'"; err = -EINVAL; @@ -5868,7 +5894,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, } if (any) { getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, err, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, err, rs, get_last_committed() + 1)); return true; } @@ -6007,7 +6033,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, pending_inc.new_primary_affinity[id] = ww; ss << "set osd." << id << " primary-affinity to " << w << " (" << ios::hex << ww << ios::dec << ")"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -6036,7 +6062,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, pending_inc.new_weight[id] = ww; ss << "reweighted osd." << id << " to " << w << " (" << ios::hex << ww << ios::dec << ")"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -6062,7 +6088,7 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, pending_inc.new_lost[id] = e; ss << "marked osd lost in epoch " << e; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -6121,14 +6147,14 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m, } if (pending_inc.new_state.count(id)) { // osd is about to exist - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } i = id; } if (pending_inc.identify_osd(uuid) >= 0) { // osd is about to exist - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } if (i >= 0) { @@ -6169,7 +6195,7 @@ done: ss << i; rdata.append(ss); } - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, rdata, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1)); return true; @@ -6192,7 +6218,7 @@ done: pending_inc.new_blacklist[addr] = expires; ss << "blacklisting " << addr << " until " << expires << " (" << d << " sec)"; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (blacklistop == "rm") { @@ -6204,7 +6230,7 @@ done: pending_inc.new_blacklist.erase(addr); ss << "un-blacklisting " << addr; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -6249,7 +6275,7 @@ done: ss << "created pool " << poolstr << " snap " << snapname; } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd pool rmsnap") { @@ -6289,7 +6315,7 @@ done: ss << "already removed pool " << poolstr << " snap " << snapname; } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd pool create") { @@ -6383,7 +6409,7 @@ done: int ruleset; err = get_crush_ruleset(ruleset_name, &ruleset, &ss); if (err == -EAGAIN) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } if (err) @@ -6410,7 +6436,7 @@ done: ss << "pool '" << poolstr << "' already exists"; break; case -EAGAIN: - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; case -ERANGE: goto reply; @@ -6422,7 +6448,7 @@ done: ss << "pool '" << poolstr << "' created"; } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -6448,7 +6474,7 @@ done: } err = _prepare_remove_pool(pool, &ss); if (err == -EAGAIN) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } if (err < 0) @@ -6492,7 +6518,7 @@ done: << cpp_strerror(ret); } getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, ret, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, ret, rs, get_last_committed() + 1)); return true; @@ -6504,7 +6530,7 @@ done: goto reply; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } else if (prefix == "osd tier add") { @@ -6566,14 +6592,14 @@ done: pg_pool_t *np = pending_inc.get_new_pool(pool_id, p); pg_pool_t *ntp = pending_inc.get_new_pool(tierpool_id, tp); if (np->tiers.count(tierpool_id) || ntp->is_tier()) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } np->tiers.insert(tierpool_id); np->set_snap_epoch(pending_inc.epoch); // tier will update to our snap info ntp->tier_of = pool_id; ss << "pool '" << tierpoolstr << "' is now (or already was) a tier of '" << poolstr << "'"; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd tier remove") { @@ -6626,13 +6652,13 @@ done: if (np->tiers.count(tierpool_id) == 0 || ntp->tier_of != pool_id || np->read_tier == tierpool_id) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } np->tiers.erase(tierpool_id); ntp->clear_tier(); ss << "pool '" << tierpoolstr << "' is now (or already was) not a tier of '" << poolstr << "'"; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd tier set-overlay") { @@ -6687,7 +6713,7 @@ done: ss << "overlay for '" << poolstr << "' is now (or already was) '" << overlaypoolstr << "'"; if (overlay_p->cache_mode == pg_pool_t::CACHEMODE_NONE) ss <<" (WARNING: overlay pool cache_mode is still NONE)"; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd tier remove-overlay") { @@ -6717,7 +6743,7 @@ done: np->clear_write_tier(); np->last_force_op_resend = pending_inc.epoch; ss << "there is now (or already was) no overlay for '" << poolstr << "'"; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd tier cache-mode") { @@ -6840,7 +6866,7 @@ done: base_pool->write_tier == pool_id) ss <<" (WARNING: pool is still configured as read or write tier)"; } - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd tier add-cache") { @@ -6916,7 +6942,7 @@ done: pg_pool_t *np = pending_inc.get_new_pool(pool_id, p); pg_pool_t *ntp = pending_inc.get_new_pool(tierpool_id, tp); if (np->tiers.count(tierpool_id) || ntp->is_tier()) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } np->tiers.insert(tierpool_id); @@ -6930,7 +6956,7 @@ done: ntp->hit_set_params = hsp; ntp->target_max_bytes = size; ss << "pool '" << tierpoolstr << "' is now (or already was) a cache tier of '" << poolstr << "'"; - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, ss.str(), + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); return true; } else if (prefix == "osd pool set-quota") { @@ -6972,7 +6998,7 @@ done: } ss << "set-quota " << field << " = " << value << " for pool " << poolstr; rs = ss.str(); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; @@ -6988,7 +7014,7 @@ done: } else { ss << "SUCCESSFUL reweight-by-utilization: " << out_str; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -7017,7 +7043,7 @@ done: } else { ss << "SUCCESSFUL reweight-by-pg: " << out_str; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; } @@ -7042,23 +7068,24 @@ done: update: getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1)); return true; wait: - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } -bool OSDMonitor::preprocess_pool_op(MPoolOp *m) +bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op) { + MPoolOp *m = static_cast(op->get_req()); if (m->op == POOL_OP_CREATE) - return preprocess_pool_op_create(m); + return preprocess_pool_op_create(op); if (!osdmap.get_pg_pool(m->pool)) { dout(10) << "attempt to delete non-existent pool id " << m->pool << dendl; - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } @@ -7071,43 +7098,43 @@ bool OSDMonitor::preprocess_pool_op(MPoolOp *m) switch (m->op) { case POOL_OP_CREATE_SNAP: if (p->is_unmanaged_snaps_mode()) { - _pool_op_reply(m, -EINVAL, osdmap.get_epoch()); + _pool_op_reply(op, -EINVAL, osdmap.get_epoch()); return true; } if (snap_exists) { - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } return false; case POOL_OP_CREATE_UNMANAGED_SNAP: if (p->is_pool_snaps_mode()) { - _pool_op_reply(m, -EINVAL, osdmap.get_epoch()); + _pool_op_reply(op, -EINVAL, osdmap.get_epoch()); return true; } return false; case POOL_OP_DELETE_SNAP: if (p->is_unmanaged_snaps_mode()) { - _pool_op_reply(m, -EINVAL, osdmap.get_epoch()); + _pool_op_reply(op, -EINVAL, osdmap.get_epoch()); return true; } if (!snap_exists) { - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } return false; case POOL_OP_DELETE_UNMANAGED_SNAP: if (p->is_pool_snaps_mode()) { - _pool_op_reply(m, -EINVAL, osdmap.get_epoch()); + _pool_op_reply(op, -EINVAL, osdmap.get_epoch()); return true; } if (p->is_removed_snap(m->snapid)) { - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } return false; case POOL_OP_DELETE: if (osdmap.lookup_pg_pool_name(m->name.c_str()) >= 0) { - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } return false; @@ -7121,44 +7148,46 @@ bool OSDMonitor::preprocess_pool_op(MPoolOp *m) return false; } -bool OSDMonitor::preprocess_pool_op_create(MPoolOp *m) +bool OSDMonitor::preprocess_pool_op_create(MonOpRequestRef op) { + MPoolOp *m = static_cast(op->get_req()); MonSession *session = m->get_session(); if (!session) { - _pool_op_reply(m, -EPERM, osdmap.get_epoch()); + _pool_op_reply(op, -EPERM, osdmap.get_epoch()); return true; } if (!session->is_capable("osd", MON_CAP_W)) { dout(5) << "attempt to create new pool without sufficient auid privileges!" << "message: " << *m << std::endl << "caps: " << session->caps << dendl; - _pool_op_reply(m, -EPERM, osdmap.get_epoch()); + _pool_op_reply(op, -EPERM, osdmap.get_epoch()); return true; } int64_t pool = osdmap.lookup_pg_pool_name(m->name.c_str()); if (pool >= 0) { - _pool_op_reply(m, 0, osdmap.get_epoch()); + _pool_op_reply(op, 0, osdmap.get_epoch()); return true; } return false; } -bool OSDMonitor::prepare_pool_op(MPoolOp *m) +bool OSDMonitor::prepare_pool_op(MonOpRequestRef op) { + MPoolOp *m = static_cast(op->get_req()); dout(10) << "prepare_pool_op " << *m << dendl; if (m->op == POOL_OP_CREATE) { - return prepare_pool_op_create(m); + return prepare_pool_op_create(op); } else if (m->op == POOL_OP_DELETE) { - return prepare_pool_op_delete(m); + return prepare_pool_op_delete(op); } int ret = 0; bool changed = false; if (!osdmap.have_pg_pool(m->pool)) { - _pool_op_reply(m, -ENOENT, osdmap.get_epoch()); + _pool_op_reply(op, -ENOENT, osdmap.get_epoch()); return false; } @@ -7178,14 +7207,14 @@ bool OSDMonitor::prepare_pool_op(MPoolOp *m) } else { ret = -EINVAL; } - _pool_op_reply(m, ret, osdmap.get_epoch()); + _pool_op_reply(op, ret, osdmap.get_epoch()); return false; case POOL_OP_DELETE_UNMANAGED_SNAP: // we won't allow removal of an unmanaged snapshot from a pool // not in unmanaged snaps mode. if (!pool->is_unmanaged_snaps_mode()) { - _pool_op_reply(m, -ENOTSUP, osdmap.get_epoch()); + _pool_op_reply(op, -ENOTSUP, osdmap.get_epoch()); return false; } /* fall-thru */ @@ -7193,7 +7222,7 @@ bool OSDMonitor::prepare_pool_op(MPoolOp *m) // but we will allow creating an unmanaged snapshot on any pool // as long as it is not in 'pool' snaps mode. if (pool->is_pool_snaps_mode()) { - _pool_op_reply(m, -EINVAL, osdmap.get_epoch()); + _pool_op_reply(op, -EINVAL, osdmap.get_epoch()); return false; } } @@ -7278,14 +7307,15 @@ bool OSDMonitor::prepare_pool_op(MPoolOp *m) } out: - wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, ret, pending_inc.epoch, &reply_data)); + wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, ret, pending_inc.epoch, &reply_data)); return true; } -bool OSDMonitor::prepare_pool_op_create(MPoolOp *m) +bool OSDMonitor::prepare_pool_op_create(MonOpRequestRef op) { - int err = prepare_new_pool(m); - wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, err, pending_inc.epoch)); + MPoolOp *m = static_cast(op->get_req()); + int err = prepare_new_pool(op); + wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, err, pending_inc.epoch)); return true; } @@ -7484,23 +7514,26 @@ int OSDMonitor::_prepare_rename_pool(int64_t pool, string newname) return 0; } -bool OSDMonitor::prepare_pool_op_delete(MPoolOp *m) +bool OSDMonitor::prepare_pool_op_delete(MonOpRequestRef op) { + MPoolOp *m = static_cast(op->get_req()); ostringstream ss; int ret = _prepare_remove_pool(m->pool, &ss); if (ret == -EAGAIN) { - wait_for_finished_proposal(new C_RetryMessage(this, m)); + wait_for_finished_proposal(new C_RetryMessage(this, op)); return true; } if (ret < 0) dout(10) << __func__ << " got " << ret << " " << ss.str() << dendl; - wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, m, ret, + wait_for_finished_proposal(new OSDMonitor::C_PoolOp(this, op, ret, pending_inc.epoch)); return true; } -void OSDMonitor::_pool_op_reply(MPoolOp *m, int ret, epoch_t epoch, bufferlist *blp) +void OSDMonitor::_pool_op_reply(MonOpRequestRef op, + int ret, epoch_t epoch, bufferlist *blp) { + MPoolOp *m = static_cast(op->get_req()); dout(20) << "_pool_op_reply " << ret << dendl; MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->get_tid(), ret, epoch, get_last_committed(), blp); diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 71a69d0a3c53e..bc5ffb522d7bd 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -44,6 +44,9 @@ class PGMap; #include "erasure-code/ErasureCodeInterface.h" +#include "common/TrackedOp.h" +#include "mon/MonOpRequest.h" + #define OSD_METADATA_PREFIX "osd_metadata" /// information about a particular peer's failure reports for one osd @@ -210,8 +213,8 @@ private: void update_logger(); void handle_query(PaxosServiceMessage *m); - bool preprocess_query(PaxosServiceMessage *m); // true if processed. - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); // true if processed. + bool prepare_update(MonOpRequestRef op); bool should_propose(double &delay); version_t get_trim_to(); @@ -235,27 +238,27 @@ private: bool check_source(PaxosServiceMessage *m, uuid_d fsid); - bool preprocess_get_osdmap(class MMonGetOSDMap *m); + bool preprocess_get_osdmap(MonOpRequestRef op); - bool preprocess_mark_me_down(class MOSDMarkMeDown *m); + bool preprocess_mark_me_down(MonOpRequestRef op); friend class C_AckMarkedDown; - bool preprocess_failure(class MOSDFailure *m); - bool prepare_failure(class MOSDFailure *m); - bool prepare_mark_me_down(class MOSDMarkMeDown *m); + bool preprocess_failure(MonOpRequestRef op); + bool prepare_failure(MonOpRequestRef op); + bool prepare_mark_me_down(MonOpRequestRef op); void process_failures(); void take_all_failures(list& ls); - bool preprocess_boot(class MOSDBoot *m); - bool prepare_boot(class MOSDBoot *m); - void _booted(MOSDBoot *m, bool logit); + bool preprocess_boot(MonOpRequestRef op); + bool prepare_boot(MonOpRequestRef op); + void _booted(MonOpRequestRef op, bool logit); - bool preprocess_alive(class MOSDAlive *m); - bool prepare_alive(class MOSDAlive *m); - void _reply_map(PaxosServiceMessage *m, epoch_t e); + bool preprocess_alive(MonOpRequestRef op); + bool prepare_alive(MonOpRequestRef op); + void _reply_map(MonOpRequestRef op, epoch_t e); - bool preprocess_pgtemp(class MOSDPGTemp *m); - bool prepare_pgtemp(class MOSDPGTemp *m); + bool preprocess_pgtemp(MonOpRequestRef op); + bool prepare_pgtemp(MonOpRequestRef op); int _check_remove_pool(int64_t pool, const pg_pool_t *pi, ostream *ss); bool _check_become_tier( @@ -269,11 +272,11 @@ private: int _prepare_remove_pool(int64_t pool, ostream *ss); int _prepare_rename_pool(int64_t pool, string newname); - bool preprocess_pool_op ( class MPoolOp *m); - bool preprocess_pool_op_create ( class MPoolOp *m); - bool prepare_pool_op (MPoolOp *m); - bool prepare_pool_op_create (MPoolOp *m); - bool prepare_pool_op_delete(MPoolOp *m); + bool preprocess_pool_op (MonOpRequestRef op); + bool preprocess_pool_op_create (MonOpRequestRef op); + bool prepare_pool_op (MonOpRequestRef op); + bool prepare_pool_op_create (MonOpRequestRef op); + bool prepare_pool_op_delete(MonOpRequestRef op); int crush_rename_bucket(const string& srcname, const string& dstname, ostream *ss); @@ -315,31 +318,35 @@ private: const unsigned pool_type, const uint64_t expected_num_objects, ostream *ss); - int prepare_new_pool(MPoolOp *m); + int prepare_new_pool(MonOpRequestRef op); void update_pool_flags(int64_t pool_id, uint64_t flags); bool update_pools_status(); void get_pools_health(list >& summary, list > *detail) const; - bool prepare_set_flag(MMonCommand *m, int flag); - bool prepare_unset_flag(MMonCommand *m, int flag); + bool prepare_set_flag(MonOpRequestRef op, int flag); + bool prepare_unset_flag(MonOpRequestRef op, int flag); - void _pool_op_reply(MPoolOp *m, int ret, epoch_t epoch, bufferlist *blp=NULL); + void _pool_op_reply(MonOpRequestRef op, + int ret, epoch_t epoch, bufferlist *blp=NULL); struct C_Booted : public Context { OSDMonitor *cmon; - MOSDBoot *m; + MonOpRequestRef op; + // MOSDBoot *m; bool logit; - C_Booted(OSDMonitor *cm, MOSDBoot *m_, bool l=true) : - cmon(cm), m(m_), logit(l) {} + C_Booted(OSDMonitor *cm, MonOpRequestRef op_, bool l=true) : + cmon(cm), op(op_), logit(l) {} void finish(int r) { if (r >= 0) - cmon->_booted(m, logit); + cmon->_booted(op, logit); else if (r == -ECANCELED) - m->put(); + return; +// m->put(); else if (r == -EAGAIN) - cmon->dispatch((PaxosServiceMessage*)m); + cmon->dispatch(op); +// cmon->dispatch((PaxosServiceMessage*)m); else assert(0 == "bad C_Booted return value"); } @@ -347,52 +354,62 @@ private: struct C_ReplyMap : public Context { OSDMonitor *osdmon; - PaxosServiceMessage *m; + MonOpRequestRef op; +// PaxosServiceMessage *m; epoch_t e; - C_ReplyMap(OSDMonitor *o, PaxosServiceMessage *mm, epoch_t ee) : osdmon(o), m(mm), e(ee) {} + C_ReplyMap(OSDMonitor *o, MonOpRequestRef op_, epoch_t ee) + : osdmon(o), op(op_), e(ee) {} void finish(int r) { if (r >= 0) - osdmon->_reply_map(m, e); + osdmon->_reply_map(op, e); else if (r == -ECANCELED) - m->put(); + return; + //m->put(); else if (r == -EAGAIN) - osdmon->dispatch(m); + osdmon->dispatch(op); else assert(0 == "bad C_ReplyMap return value"); } }; struct C_PoolOp : public Context { OSDMonitor *osdmon; - MPoolOp *m; + MonOpRequestRef op; +// MPoolOp *m; int replyCode; int epoch; bufferlist reply_data; - C_PoolOp(OSDMonitor * osd, MPoolOp *m_, int rc, int e, bufferlist *rd=NULL) : - osdmon(osd), m(m_), replyCode(rc), epoch(e) { + C_PoolOp(OSDMonitor * osd, MonOpRequestRef op_, int rc, int e, bufferlist *rd=NULL) : + osdmon(osd), op(op_), replyCode(rc), epoch(e) { if (rd) reply_data = *rd; } void finish(int r) { if (r >= 0) - osdmon->_pool_op_reply(m, replyCode, epoch, &reply_data); + osdmon->_pool_op_reply(op, replyCode, epoch, &reply_data); else if (r == -ECANCELED) - m->put(); + return; + //m->put(); else if (r == -EAGAIN) - osdmon->dispatch(m); + osdmon->dispatch(op); else assert(0 == "bad C_PoolOp return value"); } }; - bool preprocess_remove_snaps(struct MRemoveSnaps *m); - bool prepare_remove_snaps(struct MRemoveSnaps *m); + bool preprocess_remove_snaps(MonOpRequestRef op); + bool prepare_remove_snaps(MonOpRequestRef op); + + CephContext *cct; + OpTracker op_tracker; int load_metadata(int osd, map& m, ostream *err); public: - OSDMonitor(Monitor *mn, Paxos *p, string service_name) + OSDMonitor(CephContext *cct, Monitor *mn, Paxos *p, string service_name) : PaxosService(mn, p, service_name), - thrash_map(0), thrash_last_up_osd(-1) { } + thrash_map(0), thrash_last_up_osd(-1), + op_tracker(cct, true, 1) + { } void tick(); // check state, take actions @@ -400,9 +417,9 @@ private: void get_health(list >& summary, list > *detail) const; - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); - bool prepare_command_impl(MMonCommand *m, map &cmdmap); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + bool prepare_command_impl(MonOpRequestRef op, map& cmdmap); int set_crash_replay_interval(const int64_t pool_id, const uint32_t cri); int prepare_command_pool_set(map &cmdmap, diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 6705c5edbf6d7..9f6aa9c55765f 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -575,21 +575,22 @@ version_t PGMonitor::get_trim_to() return 0; } -bool PGMonitor::preprocess_query(PaxosServiceMessage *m) +bool PGMonitor::preprocess_query(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case CEPH_MSG_STATFS: - handle_statfs(static_cast(m)); + handle_statfs(op); return true; case MSG_GETPOOLSTATS: - return preprocess_getpoolstats(static_cast(m)); + return preprocess_getpoolstats(op); case MSG_PGSTATS: - return preprocess_pg_stats(static_cast(m)); + return preprocess_pg_stats(op); case MSG_MON_COMMAND: - return preprocess_command(static_cast(m)); + return preprocess_command(op); default: @@ -599,15 +600,16 @@ bool PGMonitor::preprocess_query(PaxosServiceMessage *m) } } -bool PGMonitor::prepare_update(PaxosServiceMessage *m) +bool PGMonitor::prepare_update(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_PGSTATS: - return prepare_pg_stats((MPGStats*)m); + return prepare_pg_stats(op); case MSG_MON_COMMAND: - return prepare_command(static_cast(m)); + return prepare_command(op); default: assert(0); @@ -616,8 +618,9 @@ bool PGMonitor::prepare_update(PaxosServiceMessage *m) } } -void PGMonitor::handle_statfs(MStatfs *statfs) +void PGMonitor::handle_statfs(MonOpRequestRef op) { + MStatfs *statfs = static_cast(op->get_req()); // check caps MonSession *session = statfs->get_session(); if (!session) @@ -651,8 +654,9 @@ void PGMonitor::handle_statfs(MStatfs *statfs) statfs->put(); } -bool PGMonitor::preprocess_getpoolstats(MGetPoolStats *m) +bool PGMonitor::preprocess_getpoolstats(MonOpRequestRef op) { + MGetPoolStats *m = static_cast(op->get_req()); MGetPoolStatsReply *reply; MonSession *session = m->get_session(); @@ -690,8 +694,9 @@ bool PGMonitor::preprocess_getpoolstats(MGetPoolStats *m) } -bool PGMonitor::preprocess_pg_stats(MPGStats *stats) +bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op) { + MPGStats *stats = static_cast(op->get_req()); // check caps MonSession *session = stats->get_session(); if (!session) { @@ -742,8 +747,9 @@ bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const return false; } -bool PGMonitor::prepare_pg_stats(MPGStats *stats) +bool PGMonitor::prepare_pg_stats(MonOpRequestRef op) { + MPGStats *stats = static_cast(op->get_req()); dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl; int from = stats->get_orig_source().num(); @@ -791,6 +797,7 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats) // pg stats MPGStatsAck *ack = new MPGStatsAck; + MonOpRequestRef ack_op = mon->op_tracker.create_request(ack); ack->set_tid(stats->get_tid()); for (map::iterator p = stats->pg_stat.begin(); p != stats->pg_stat.end(); @@ -835,12 +842,14 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats) */ } - wait_for_finished_proposal(new C_Stats(this, stats, ack)); + wait_for_finished_proposal(new C_Stats(this, op, ack_op)); return true; } -void PGMonitor::_updated_stats(MPGStats *req, MPGStatsAck *ack) +void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op) { + MPGStats *req = static_cast(op->get_req()); + MPGStats *ack = static_cast(ack_op->get_req()); dout(7) << "_updated_stats for " << req->get_orig_source_inst() << dendl; mon->send_reply(req, ack); req->put(); @@ -1456,8 +1465,9 @@ void PGMonitor::dump_info(Formatter *f) f->dump_unsigned("pgmap_last_committed", get_last_committed()); } -bool PGMonitor::preprocess_command(MMonCommand *m) +bool PGMonitor::preprocess_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); int r = -1; bufferlist rdata; stringstream ss, ds; @@ -1782,8 +1792,9 @@ bool PGMonitor::preprocess_command(MMonCommand *m) return true; } -bool PGMonitor::prepare_command(MMonCommand *m) +bool PGMonitor::prepare_command(MonOpRequestRef op) { + MMonCommand *m = static_cast(op->get_req()); stringstream ss; pg_t pgid; epoch_t epoch = mon->osdmon()->osdmap.get_epoch(); @@ -1862,7 +1873,7 @@ bool PGMonitor::prepare_command(MMonCommand *m) update: getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, r, rs, + wait_for_finished_proposal(new Monitor::C_Command(mon, op, r, rs, get_last_committed() + 1)); return true; } diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index 97a9ac1194b95..80d26f5cecd06 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -74,40 +74,46 @@ private: void read_pgmap_full(); void apply_pgmap_delta(bufferlist& bl); - bool preprocess_query(PaxosServiceMessage *m); // true if processed. - bool prepare_update(PaxosServiceMessage *m); + bool preprocess_query(MonOpRequestRef op); // true if processed. + bool prepare_update(MonOpRequestRef op); - bool preprocess_pg_stats(MPGStats *stats); + bool preprocess_pg_stats(MonOpRequestRef op); bool pg_stats_have_changed(int from, const MPGStats *stats) const; - bool prepare_pg_stats(MPGStats *stats); - void _updated_stats(MPGStats *req, MPGStatsAck *ack); + bool prepare_pg_stats(MonOpRequestRef op); + void _updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op); struct C_Stats : public Context { PGMonitor *pgmon; - MPGStats *req; - MPGStatsAck *ack; + MonOpRequestRef stats_op; + MonOpRequestRef stats_op_ack; +// MPGStats *req; +// MPGStatsAck *ack; entity_inst_t who; - C_Stats(PGMonitor *p, MPGStats *r, MPGStatsAck *a) : pgmon(p), req(r), ack(a) {} + C_Stats(PGMonitor *p, + MonOpRequestRef op, + MonOpRequestRef op_ack) + : pgmon(p), stats_op(op), stats_op_ack(op_ack) {} void finish(int r) { if (r >= 0) { - pgmon->_updated_stats(req, ack); + pgmon->_updated_stats(stats_op, stats_op_ack); } else if (r == -ECANCELED) { - req->put(); - ack->put(); + return; +// req->put(); +// ack->put(); } else if (r == -EAGAIN) { - pgmon->dispatch(req); - ack->put(); + pgmon->dispatch(stats_op); +// ack->put(); } else { assert(0 == "bad C_Stats return value"); } } }; - void handle_statfs(MStatfs *statfs); - bool preprocess_getpoolstats(MGetPoolStats *m); + void handle_statfs(MonOpRequestRef op); + bool preprocess_getpoolstats(MonOpRequestRef op); - bool preprocess_command(MMonCommand *m); - bool prepare_command(MMonCommand *m); + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); map last_sent_pg_create; // per osd throttle diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 5863792f61dc9..481e8f4ecc0bd 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -192,8 +192,9 @@ void Paxos::collect(version_t oldpn) // peon -void Paxos::handle_collect(MMonPaxos *collect) +void Paxos::handle_collect(MonOpRequestRef op) { + MMonPaxos *collect = static_cast(op->get_req()); dout(10) << "handle_collect " << *collect << dendl; assert(mon->is_peon()); // mon epoch filter should catch strays @@ -449,8 +450,9 @@ void Paxos::_sanity_check_store() // leader -void Paxos::handle_last(MMonPaxos *last) +void Paxos::handle_last(MonOpRequestRef op) { + MMonPaxos *last = static_cast(op->get_req()); bool need_refresh = false; int from = last->get_source().num(); @@ -689,8 +691,9 @@ void Paxos::begin(bufferlist& v) } // peon -void Paxos::handle_begin(MMonPaxos *begin) +void Paxos::handle_begin(MonOpRequestRef op) { + MMonPaxos *begin = static_cast(op->get_req()); dout(10) << "handle_begin " << *begin << dendl; // can we accept this? @@ -749,8 +752,9 @@ void Paxos::handle_begin(MMonPaxos *begin) } // leader -void Paxos::handle_accept(MMonPaxos *accept) +void Paxos::handle_accept(MonOpRequestRef op) { + MMonPaxos *accept = static_cast(op->get_req()); dout(10) << "handle_accept " << *accept << dendl; int from = accept->get_source().num(); @@ -917,8 +921,9 @@ void Paxos::commit_finish() } -void Paxos::handle_commit(MMonPaxos *commit) +void Paxos::handle_commit(MonOpRequestRef op) { + MMonPaxos *commit = static_cast(op->get_req()); dout(10) << "handle_commit on " << commit->last_committed << dendl; logger->inc(l_paxos_commit); @@ -1061,8 +1066,9 @@ void Paxos::finish_round() // peon -void Paxos::handle_lease(MMonPaxos *lease) +void Paxos::handle_lease(MonOpRequestRef op) { + MMonPaxos *lease = static_cast(op->get_req()); // sanity if (!mon->is_peon() || last_committed != lease->last_committed) { @@ -1109,8 +1115,9 @@ void Paxos::handle_lease(MMonPaxos *lease) lease->put(); } -void Paxos::handle_lease_ack(MMonPaxos *ack) +void Paxos::handle_lease_ack(MonOpRequestRef op) { + MMonPaxos *ack = static_cast(op->get_req()); int from = ack->get_source().num(); if (!lease_ack_timeout_event) { @@ -1363,8 +1370,9 @@ void Paxos::restart() } -void Paxos::dispatch(PaxosServiceMessage *m) +void Paxos::dispatch(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); // election in progress? if (!mon->is_leader() && !mon->is_peon()) { dout(5) << "election in progress, dropping " << *m << dendl; @@ -1386,25 +1394,25 @@ void Paxos::dispatch(PaxosServiceMessage *m) switch (pm->op) { // learner case MMonPaxos::OP_COLLECT: - handle_collect(pm); + handle_collect(op); break; case MMonPaxos::OP_LAST: - handle_last(pm); + handle_last(op); break; case MMonPaxos::OP_BEGIN: - handle_begin(pm); + handle_begin(op); break; case MMonPaxos::OP_ACCEPT: - handle_accept(pm); + handle_accept(op); break; case MMonPaxos::OP_COMMIT: - handle_commit(pm); + handle_commit(op); break; case MMonPaxos::OP_LEASE: - handle_lease(pm); + handle_lease(op); break; case MMonPaxos::OP_LEASE_ACK: - handle_lease_ack(pm); + handle_lease_ack(op); break; default: assert(0); diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 12b35db72eb25..2fe6403d59492 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -122,6 +122,7 @@ e 12v #include #include "MonitorDBStore.h" +#include "mon/MonOpRequest.h" class Monitor; class MMonPaxos; @@ -775,7 +776,7 @@ private: * * @param collect The collect message sent by the Leader to the Peon. */ - void handle_collect(MMonPaxos *collect); + void handle_collect(MonOpRequestRef op); /** * Handle a response from a Peon to the Leader's collect phase. * @@ -806,7 +807,7 @@ private: * * @param last The message sent by the Peon to the Leader. */ - void handle_last(MMonPaxos *last); + void handle_last(MonOpRequestRef op); /** * The Recovery Phase timed out, meaning that a significant part of the * quorum does not believe we are the Leader, and we thus should trigger new @@ -867,7 +868,7 @@ private: * Paxos::begin function * */ - void handle_begin(MMonPaxos *begin); + void handle_begin(MonOpRequestRef op); /** * Handle an Accept message sent by a Peon. * @@ -892,7 +893,7 @@ private: * @param accept The message sent by the Peons to the Leader during the * Paxos::handle_begin function */ - void handle_accept(MMonPaxos *accept); + void handle_accept(MonOpRequestRef op); /** * Trigger a fresh election. * @@ -946,7 +947,7 @@ private: * @param commit The message sent by the Leader to the Peon during * Paxos::commit */ - void handle_commit(MMonPaxos *commit); + void handle_commit(MonOpRequestRef op); /** * Extend the system's lease. * @@ -990,7 +991,7 @@ private: * @param lease The message sent by the Leader to the Peon during the * Paxos::extend_lease function */ - void handle_lease(MMonPaxos *lease); + void handle_lease(MonOpRequestRef op); /** * Account for all the Lease Acks the Leader receives from the Peons. * @@ -1007,7 +1008,7 @@ private: * @param ack The message sent by a Peon to the Leader during the * Paxos::handle_lease function */ - void handle_lease_ack(MMonPaxos *ack); + void handle_lease_ack(MonOpRequestRef op); /** * Call fresh elections because at least one Peon didn't acked our lease. * @@ -1111,7 +1112,7 @@ public: return paxos_name; } - void dispatch(PaxosServiceMessage *m); + void dispatch(MonOpRequestRef op); void read_and_prepare_transactions(MonitorDBStore::TransactionRef tx, version_t from, version_t last); diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index 4bdffc285f463..69fa9a5a4fcea 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -22,6 +22,8 @@ #include "include/assert.h" #include "common/Formatter.h" +#include "mon/MonOpRequest.h" + #define dout_subsys ceph_subsys_paxos #undef dout_prefix #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) @@ -32,8 +34,10 @@ static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; } -bool PaxosService::dispatch(PaxosServiceMessage *m) +bool PaxosService::dispatch(MonOpRequestRef op) { + PaxosServiceMessage *m = static_cast(op->get_req()); + dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl; if (mon->is_shutdown()) { @@ -66,12 +70,12 @@ bool PaxosService::dispatch(PaxosServiceMessage *m) // make sure our map is readable and up to date if (!is_readable(m->version)) { dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; - wait_for_readable(new C_RetryMessage(this, m), m->version); + wait_for_readable(new C_RetryMessage(this, op), m->version); return true; } // preprocess - if (preprocess_query(m)) + if (preprocess_query(op)) return true; // easy! // leader? @@ -83,12 +87,12 @@ bool PaxosService::dispatch(PaxosServiceMessage *m) // writeable? if (!is_writeable()) { dout(10) << " waiting for paxos -> writeable" << dendl; - wait_for_writeable(new C_RetryMessage(this, m)); + wait_for_writeable(new C_RetryMessage(this, op)); return true; } // update - if (prepare_update(m)) { + if (prepare_update(op)) { double delay = 0.0; if (should_propose(delay)) { if (delay == 0.0) { diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 0e7ef7add689f..b876164b63d96 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -104,14 +104,16 @@ protected: */ class C_RetryMessage : public Context { PaxosService *svc; - PaxosServiceMessage *m; + MonOpRequestRef op; +// PaxosServiceMessage *m; public: - C_RetryMessage(PaxosService *s, PaxosServiceMessage *m_) : svc(s), m(m_) {} + C_RetryMessage(PaxosService *s, MonOpRequestRef op_) : svc(s), op(op_) {} void finish(int r) { if (r == -EAGAIN || r >= 0) - svc->dispatch(m); + svc->dispatch(op); else if (r == -ECANCELED) - m->put(); + return; +// m->put(); else assert(0 == "bad C_RetryMessage return value"); } @@ -319,7 +321,7 @@ public: * @param m A message * @returns 'true' on successful dispatch; 'false' otherwise. */ - bool dispatch(PaxosServiceMessage *m); + bool dispatch(MonOpRequestRef op); void refresh(bool *need_bootstrap); void post_refresh(); @@ -403,7 +405,7 @@ public: * answered, was a state change that has no effect); 'false' * otherwise. */ - virtual bool preprocess_query(PaxosServiceMessage *m) = 0; + virtual bool preprocess_query(MonOpRequestRef op) = 0; /** * Apply the message to the pending state. @@ -414,7 +416,7 @@ public: * @returns 'true' if the update message was handled (e.g., a command that * went through); 'false' otherwise. */ - virtual bool prepare_update(PaxosServiceMessage *m) = 0; + virtual bool prepare_update(MonOpRequestRef op) = 0; /** * @} */ diff --git a/src/mon/QuorumService.h b/src/mon/QuorumService.h index ef9dcdcf1bd6c..69d5390587841 100644 --- a/src/mon/QuorumService.h +++ b/src/mon/QuorumService.h @@ -82,7 +82,7 @@ protected: return (mon->is_leader() || mon->is_peon()); } - virtual bool service_dispatch(Message *m) = 0; + virtual bool service_dispatch(MonOpRequestRef op) = 0; virtual void service_tick() = 0; virtual void service_shutdown() = 0; @@ -107,8 +107,8 @@ public: return epoch; } - bool dispatch(Message *m) { - return service_dispatch(m); + bool dispatch(MonOpRequestRef op) { + return service_dispatch(op); } void tick() { -- 2.39.5