}
}
-void MonClient::_pick_new_mon()
+void MonClient::_reopen_session(int rank, string name)
{
assert(monc_lock.is_locked());
+ ldout(cct, 10) << "_reopen_session rank " << rank << " name " << name << dendl;
- cur_mon = _pick_random_mon();
+ if (rank < 0 && name.length() == 0) {
+ cur_mon = _pick_random_mon();
+ } else if (name.length()) {
+ cur_mon = name;
+ } else {
+ cur_mon = monmap.get_name(rank);
+ }
if (cur_con) {
messenger->mark_down(cur_con);
}
cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
- ldout(cct, 10) << "_pick_new_mon picked mon." << cur_mon << " con " << cur_con
+ ldout(cct, 10) << "picked mon." << cur_mon << " con " << cur_con
<< " addr " << cur_con->get_peer_addr()
<< dendl;
-}
-
-
-void MonClient::_reopen_session()
-{
- assert(monc_lock.is_locked());
- ldout(cct, 10) << "_reopen_session" << dendl;
-
- _pick_new_mon();
// throw out old queued messages
while (!waiting_for_session.empty()) {
void MonClient::_send_command(MonCommand *r)
{
version_t last_seen_version = 0;
+
+ if (r->target_rank >= 0 &&
+ r->target_rank != monmap.get_rank(cur_mon)) {
+ ldout(cct, 10) << "_send_command " << r->tid << " " << r->cmd
+ << " wants rank " << r->target_rank
+ << ", reopening session"
+ << dendl;
+ if (r->target_rank >= (int)monmap.size()) {
+ ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
+ _finish_command(r, -ENOENT, "mon rank dne");
+ return;
+ }
+ _reopen_session(r->target_rank, string());
+ return;
+ }
+
+ if (r->target_name.length() &&
+ r->target_name != cur_mon) {
+ ldout(cct, 10) << "_send_command " << r->tid << " " << r->cmd
+ << " wants mon " << r->target_name
+ << ", reopening session"
+ << dendl;
+ if (!monmap.contains(r->target_name)) {
+ ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
+ _finish_command(r, -ENOENT, "mon dne");
+ return;
+ }
+ _reopen_session(-1, r->target_name);
+ return;
+ }
+
ldout(cct, 10) << "_send_command " << r->tid << " " << r->cmd << dendl;
MMonCommand *m = new MMonCommand(monmap.fsid, last_seen_version);
m->set_tid(r->tid);
return 0;
}
+int MonClient::start_mon_command(string name,
+ const vector<string>& cmd, bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish)
+{
+ Mutex::Locker l(monc_lock);
+ MonCommand *r = new MonCommand(++last_mon_command_tid);
+ r->target_name = name;
+ r->cmd = cmd;
+ r->inbl = inbl;
+ r->poutbl = outbl;
+ r->prs = outs;
+ r->onfinish = onfinish;
+ mon_commands[r->tid] = r;
+ _send_command(r);
+ // can't fail
+ return 0;
+}
+
+int MonClient::start_mon_command(int rank,
+ const vector<string>& cmd, bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish)
+{
+ Mutex::Locker l(monc_lock);
+ MonCommand *r = new MonCommand(++last_mon_command_tid);
+ r->target_rank = rank;
+ r->cmd = cmd;
+ r->inbl = inbl;
+ r->poutbl = outbl;
+ r->prs = outs;
+ r->onfinish = onfinish;
+ mon_commands[r->tid] = r;
+ _send_command(r);
+ return 0;
+}
+
// ---------
void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
string _pick_random_mon();
void _finish_hunting();
- void _reopen_session();
- void _pick_new_mon();
+ void _reopen_session(int rank, string name);
+ void _reopen_session() {
+ _reopen_session(-1, string());
+ }
void _send_mon_message(Message *m, bool force=false);
public:
private:
uint64_t last_mon_command_tid;
struct MonCommand {
+ string target_name;
+ int target_rank;
uint64_t tid;
vector<string> cmd;
bufferlist inbl;
Context *onfinish;
MonCommand(uint64_t t)
- : tid(t),
+ : target_rank(-1),
+ tid(t),
poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL)
{}
};
int start_mon_command(const vector<string>& cmd, bufferlist& inbl,
bufferlist *outbl, string *outs,
Context *onfinish);
+ int start_mon_command(int mon_rank,
+ const vector<string>& cmd, bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish);
+ int start_mon_command(const string mon_name, ///< mon name, with mon. prefix
+ const vector<string>& cmd, bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish);
// version requests
public: