#include "messages/MAuthReply.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
#include "messages/MPing.h"
#include "messages/MMonSubscribe.h"
case CEPH_MSG_MON_SUBSCRIBE_ACK:
case CEPH_MSG_MON_GET_VERSION_REPLY:
case MSG_MON_COMMAND_ACK:
+ case MSG_COMMAND_REPLY:
case MSG_LOGACK:
case MSG_CONFIG:
break;
std::lock_guard lock(monc_lock);
- if (_hunting()) {
- auto p = _find_pending_con(m->get_connection());
- if (p == pending_cons.end()) {
- // ignore any messages outside hunting sessions
+ if (!m->get_connection()->is_anon() &&
+ m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+ if (_hunting()) {
+ auto p = _find_pending_con(m->get_connection());
+ if (p == pending_cons.end()) {
+ // ignore any messages outside hunting sessions
+ ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+ m->put();
+ return true;
+ }
+ } else if (!active_con || active_con->get_con() != m->get_connection()) {
+ // ignore any messages outside our session(s)
ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
m->put();
return true;
}
- } else if (!active_con || active_con->get_con() != m->get_connection()) {
- // ignore any messages outside our session(s)
- ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
- m->put();
- return true;
}
switch (m->get_type()) {
case MSG_MON_COMMAND_ACK:
handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
break;
+ case MSG_COMMAND_REPLY:
+ if (m->get_connection()->is_anon() &&
+ m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+ // this connection is from 'tell'... ignore everything except our command
+ // reply. (we'll get misc other message because we authenticated, but we
+ // don't need them.)
+ handle_command_reply(static_cast<MCommandReply*>(m));
+ return true;
+ }
+ // leave the message for another dispatch handler (e.g., Objecter)
+ return false;
case MSG_LOGACK:
if (log_client) {
log_client->handle_log_ack(static_cast<MLogAck*>(m));
void MonClient::handle_auth(MAuthReply *m)
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
+
+ if (m->get_connection()->is_anon()) {
+ // anon connection, used for mon tell commands
+ for (auto& p : mon_commands) {
+ if (p.second->target_con == m->get_connection()) {
+ auto& mc = p.second->target_session;
+ int ret = mc->handle_auth(m, entity_name,
+ CEPH_ENTITY_TYPE_MON,
+ rotating_secrets.get());
+ (void)ret; // we don't care
+ break;
+ }
+ }
+ m->put();
+ return;
+ }
+
if (!_hunting()) {
std::swap(active_con->get_auth(), auth);
int ret = active_con->authenticate(m);
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
return false;
+ if (con->is_anon()) {
+ auto p = mon_commands.begin();
+ while (p != mon_commands.end()) {
+ auto cmd = p->second;
+ ++p;
+ if (cmd->target_con == con) {
+ _send_command(cmd); // may retry or fail
+ break;
+ }
+ }
+ return true;
+ }
+
if (_hunting()) {
if (pending_cons.count(con->get_peer_addrs())) {
ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
});
_check_auth_tickets();
+ _check_tell_commands();
if (_hunting()) {
ldout(cct, 1) << "continuing hunt" << dendl;
void MonClient::_send_command(MonCommand *r)
{
- ++r->send_attempts;
-
- entity_addr_t peer;
- if (active_con) {
- peer = active_con->get_con()->get_peer_addr();
- }
-
- if (r->target_rank >= 0 &&
- r->target_rank != monmap.get_rank(peer)) {
+ if (r->is_tell()) {
+ ++r->send_attempts;
if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
_finish_command(r, -ENXIO, "mon unavailable");
return;
}
- ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
- << " wants rank " << r->target_rank
- << ", reopening session"
- << dendl;
- if (r->target_rank >= (int)monmap.size()) {
- ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
- _finish_command(r, -ENOENT, "mon rank dne");
+
+ // tell-style command
+ if (monmap.min_mon_release >= ceph_release_t::octopus) {
+ if (r->target_rank >= (int)monmap.size()) {
+ ldout(cct, 10) << " target " << r->target_rank
+ << " >= max mon " << monmap.size() << dendl;
+ _finish_command(r, -ENOENT, "mon rank dne");
+ return;
+ }
+ if (!monmap.contains(r->target_name)) {
+ ldout(cct, 10) << " target " << r->target_name
+ << " not present in monmap" << dendl;
+ _finish_command(r, -ENOENT, "mon dne");
+ return;
+ }
+
+ if (r->target_con) {
+ r->target_con->mark_down();
+ }
+ if (r->target_rank >= 0) {
+ r->target_con = messenger->connect_to_mon(
+ monmap.get_addrs(r->target_rank), true /* anon */);
+ } else {
+ r->target_con = messenger->connect_to_mon(
+ monmap.get_addrs(r->target_name), true /* anon */);
+ }
+
+ r->target_session.reset(new MonConnection(cct, r->target_con, 0,
+ &auth_registry));
+ r->target_session->start(monmap.get_epoch(), entity_name);
+ r->last_send_attempt = ceph_clock_now();
+
+ MCommand *m = new MCommand(monmap.fsid);
+ m->set_tid(r->tid);
+ m->cmd = r->cmd;
+ m->set_data(r->inbl);
+ r->target_session->queue_command(m);
return;
}
- _reopen_session(r->target_rank);
- return;
- }
- if (r->target_name.length() &&
- r->target_name != monmap.get_name(peer)) {
- if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
- _finish_command(r, -ENXIO, "mon unavailable");
+ // ugly legacy handling of pre-octopus mons
+ entity_addr_t peer;
+ if (active_con) {
+ peer = active_con->get_con()->get_peer_addr();
+ }
+
+ if (r->target_rank >= 0 &&
+ r->target_rank != monmap.get_rank(peer)) {
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+ << " wants rank " << r->target_rank
+ << ", reopening session"
+ << dendl;
+ if (r->target_rank >= (int)monmap.size()) {
+ ldout(cct, 10) << " target " << r->target_rank
+ << " >= max mon " << monmap.size() << dendl;
+ _finish_command(r, -ENOENT, "mon rank dne");
+ return;
+ }
+ _reopen_session(r->target_rank);
return;
}
- ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
- << " wants mon " << r->target_name
- << ", reopening session"
- << dendl;
- if (!monmap.contains(r->target_name)) {
- ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
- _finish_command(r, -ENOENT, "mon dne");
+ if (r->target_name.length() &&
+ r->target_name != monmap.get_name(peer)) {
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+ << " wants mon " << r->target_name
+ << ", reopening session"
+ << dendl;
+ if (!monmap.contains(r->target_name)) {
+ ldout(cct, 10) << " target " << r->target_name
+ << " not present in monmap" << dendl;
+ _finish_command(r, -ENOENT, "mon dne");
+ return;
+ }
+ _reopen_session(monmap.get_rank(r->target_name));
return;
}
- _reopen_session(monmap.get_rank(r->target_name));
- return;
+ // fall-thru to send 'normal' CLI command
}
+ // normal CLI command
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
auto m = ceph::make_message<MMonCommand>(monmap.fsid);
m->set_tid(r->tid);
return;
}
+void MonClient::_check_tell_commands()
+{
+ // resend any requests
+ auto now = ceph_clock_now();
+ auto p = mon_commands.begin();
+ while (p != mon_commands.end()) {
+ auto cmd = p->second;
+ ++p;
+ if (cmd->is_tell() &&
+ cmd->last_send_attempt != utime_t() &&
+ now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
+ ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
+ _send_command(cmd); // might remove cmd from mon_commands
+ }
+ }
+}
+
void MonClient::_resend_mon_commands()
{
// resend any requests
while (p != mon_commands.end()) {
auto cmd = p->second;
++p;
- _send_command(cmd); // might remove cmd from mon_commands
+ if (!cmd->is_tell()) {
+ _send_command(cmd); // might remove cmd from mon_commands
+ }
}
}
ack->put();
}
+void MonClient::handle_command_reply(MCommandReply *reply)
+{
+ MonCommand *r = NULL;
+ uint64_t tid = reply->get_tid();
+
+ if (tid == 0 && !mon_commands.empty()) {
+ r = mon_commands.begin()->second;
+ ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
+ << dendl;
+ } else {
+ auto p = mon_commands.find(tid);
+ if (p == mon_commands.end()) {
+ ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
+ << dendl;
+ reply->put();
+ return;
+ }
+ r = p->second;
+ }
+
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
+ if (r->poutbl)
+ r->poutbl->claim(reply->get_data());
+ _finish_command(r, reply->r, reply->rs);
+ reply->put();
+}
+
int MonClient::_cancel_mon_command(uint64_t tid)
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
*(r->prs) = rs;
if (r->onfinish)
finisher.queue(r->onfinish, ret);
+ if (r->target_con) {
+ r->target_con->mark_down();
+ }
mon_commands.erase(r->tid);
delete r;
}
// connection to mon?
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
ceph_assert(!auth_meta->authorizer);
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->get_auth_request(
+ auth_method, preferred_modes, bl,
+ entity_name, want_keys, rotating_secrets.get());
+ }
+ }
+ }
for (auto& i : pending_cons) {
if (i.second.is_con(con)) {
return i.second.get_auth_request(
std::lock_guard l(monc_lock);
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->handle_auth_reply_more(
+ auth_meta, bl, reply);
+ }
+ }
+ }
for (auto& i : pending_cons) {
if (i.second.is_con(con)) {
return i.second.handle_auth_reply_more(auth_meta, bl, reply);
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
std::lock_guard l(monc_lock);
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->handle_auth_done(
+ auth_meta, global_id, bl,
+ session_key, connection_secret);
+ }
+ }
+ }
for (auto& i : pending_cons) {
if (i.second.is_con(con)) {
int r = i.second.handle_auth_done(
std::lock_guard l(monc_lock);
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ int r = i.second->target_session->handle_auth_bad_method(
+ old_auth_method,
+ result,
+ allowed_methods,
+ allowed_modes);
+ if (r < 0) {
+ _finish_command(i.second, r, "auth failed");
+ }
+ return r;
+ }
+ }
+ }
for (auto& i : pending_cons) {
if (i.second.is_con(con)) {
int r = i.second.handle_auth_bad_method(old_auth_method,
state = State::HAVE_SESSION;
}
con->set_last_keepalive_ack(auth_start);
+
+ if (pending_tell_command) {
+ con->send_message2(std::move(pending_tell_command));
+ }
return auth_err;
}
auth->build_request(ma->auth_payload);
con->send_message(ma);
}
+ if (ret == 0 && pending_tell_command) {
+ con->send_message2(std::move(pending_tell_command));
+ }
+
return ret;
}