]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/MonClient: send tell commands out of band via MCommand
authorSage Weil <sage@redhat.com>
Fri, 6 Sep 2019 20:18:12 +0000 (15:18 -0500)
committerSage Weil <sage@redhat.com>
Tue, 1 Oct 2019 21:30:53 +0000 (16:30 -0500)
The current tell mon command handling is pretty fragile and semi-broken:
we force the client to (exclusively) connect to the target mon, disrupting
other monclient business, and the retry logic is fragile.

Instead, use entirely independent connections for each tell command, and
tear them down when we get a reply. Implement independent and simple
error handling and timeouts.

Keep most of the old behavior alive so that we can still use tell against
pre-octopus mons.

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/MonClient.cc
src/mon/MonClient.h

index 056f35ed3bae358a2488d4a43d3716e4e79f83e0..70b97c549638579aec6b7a623187aa97bd1a7e10 100644 (file)
@@ -35,6 +35,8 @@
 #include "messages/MAuthReply.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
 #include "messages/MPing.h"
 
 #include "messages/MMonSubscribe.h"
@@ -278,6 +280,7 @@ bool MonClient::ms_dispatch(Message *m)
   case CEPH_MSG_MON_SUBSCRIBE_ACK:
   case CEPH_MSG_MON_GET_VERSION_REPLY:
   case MSG_MON_COMMAND_ACK:
+  case MSG_COMMAND_REPLY:
   case MSG_LOGACK:
   case MSG_CONFIG:
     break;
@@ -290,19 +293,22 @@ bool MonClient::ms_dispatch(Message *m)
 
   std::lock_guard lock(monc_lock);
 
-  if (_hunting()) {
-    auto p = _find_pending_con(m->get_connection());
-    if (p == pending_cons.end()) {
-      // ignore any messages outside hunting sessions
+  if (!m->get_connection()->is_anon() &&
+      m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+    if (_hunting()) {
+      auto p = _find_pending_con(m->get_connection());
+      if (p == pending_cons.end()) {
+       // ignore any messages outside hunting sessions
+       ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+       m->put();
+       return true;
+      }
+    } else if (!active_con || active_con->get_con() != m->get_connection()) {
+      // ignore any messages outside our session(s)
       ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
       m->put();
       return true;
     }
-  } else if (!active_con || active_con->get_con() != m->get_connection()) {
-    // ignore any messages outside our session(s)
-    ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
-    m->put();
-    return true;
   }
 
   switch (m->get_type()) {
@@ -326,6 +332,17 @@ bool MonClient::ms_dispatch(Message *m)
   case MSG_MON_COMMAND_ACK:
     handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
     break;
+  case MSG_COMMAND_REPLY:
+    if (m->get_connection()->is_anon() &&
+        m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+      // this connection is from 'tell'... ignore everything except our command
+      // reply.  (we'll get misc other message because we authenticated, but we
+      // don't need them.)
+      handle_command_reply(static_cast<MCommandReply*>(m));
+      return true;
+    }
+    // leave the message for another dispatch handler (e.g., Objecter)
+    return false;
   case MSG_LOGACK:
     if (log_client) {
       log_client->handle_log_ack(static_cast<MLogAck*>(m));
@@ -552,6 +569,23 @@ int MonClient::authenticate(double timeout)
 void MonClient::handle_auth(MAuthReply *m)
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
+
+  if (m->get_connection()->is_anon()) {
+    // anon connection, used for mon tell commands
+    for (auto& p : mon_commands) {
+      if (p.second->target_con == m->get_connection()) {
+       auto& mc = p.second->target_session;
+       int ret = mc->handle_auth(m, entity_name,
+                                 CEPH_ENTITY_TYPE_MON,
+                                 rotating_secrets.get());
+       (void)ret; // we don't care
+       break;
+      }
+    }
+    m->put();
+    return;
+  }
+
   if (!_hunting()) {
     std::swap(active_con->get_auth(), auth);
     int ret = active_con->authenticate(m);
@@ -754,6 +788,19 @@ bool MonClient::ms_handle_reset(Connection *con)
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
     return false;
 
+  if (con->is_anon()) {
+    auto p = mon_commands.begin();
+    while (p != mon_commands.end()) {
+      auto cmd = p->second;
+      ++p;
+      if (cmd->target_con == con) {
+       _send_command(cmd); // may retry or fail
+       break;
+      }
+    }
+    return true;
+  }
+
   if (_hunting()) {
     if (pending_cons.count(con->get_peer_addrs())) {
       ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
@@ -848,6 +895,7 @@ void MonClient::tick()
     });
 
   _check_auth_tickets();
+  _check_tell_commands();
   
   if (_hunting()) {
     ldout(cct, 1) << "continuing hunt" << dendl;
@@ -1030,51 +1078,92 @@ int MonClient::wait_auth_rotating(double timeout)
 
 void MonClient::_send_command(MonCommand *r)
 {
-  ++r->send_attempts;
-
-  entity_addr_t peer;
-  if (active_con) {
-    peer = active_con->get_con()->get_peer_addr();
-  }
-
-  if (r->target_rank >= 0 &&
-      r->target_rank != monmap.get_rank(peer)) {
+  if (r->is_tell()) {
+    ++r->send_attempts;
     if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
       _finish_command(r, -ENXIO, "mon unavailable");
       return;
     }
-    ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
-                  << " wants rank " << r->target_rank
-                  << ", reopening session"
-                  << dendl;
-    if (r->target_rank >= (int)monmap.size()) {
-      ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
-      _finish_command(r, -ENOENT, "mon rank dne");
+
+    // tell-style command
+    if (monmap.min_mon_release >= ceph_release_t::octopus) {
+      if (r->target_rank >= (int)monmap.size()) {
+       ldout(cct, 10) << " target " << r->target_rank
+                      << " >= max mon " << monmap.size() << dendl;
+       _finish_command(r, -ENOENT, "mon rank dne");
+       return;
+      }
+      if (!monmap.contains(r->target_name)) {
+       ldout(cct, 10) << " target " << r->target_name
+                      << " not present in monmap" << dendl;
+       _finish_command(r, -ENOENT, "mon dne");
+       return;
+      }
+
+      if (r->target_con) {
+       r->target_con->mark_down();
+      }
+      if (r->target_rank >= 0) {
+       r->target_con = messenger->connect_to_mon(
+         monmap.get_addrs(r->target_rank), true /* anon */);
+      } else {
+       r->target_con = messenger->connect_to_mon(
+         monmap.get_addrs(r->target_name), true /* anon */);
+      }
+
+      r->target_session.reset(new MonConnection(cct, r->target_con, 0,
+                                               &auth_registry));
+      r->target_session->start(monmap.get_epoch(), entity_name);
+      r->last_send_attempt = ceph_clock_now();
+
+      MCommand *m = new MCommand(monmap.fsid);
+      m->set_tid(r->tid);
+      m->cmd = r->cmd;
+      m->set_data(r->inbl);
+      r->target_session->queue_command(m);
       return;
     }
-    _reopen_session(r->target_rank);
-    return;
-  }
 
-  if (r->target_name.length() &&
-      r->target_name != monmap.get_name(peer)) {
-    if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
-      _finish_command(r, -ENXIO, "mon unavailable");
+    // ugly legacy handling of pre-octopus mons
+    entity_addr_t peer;
+    if (active_con) {
+      peer = active_con->get_con()->get_peer_addr();
+    }
+
+    if (r->target_rank >= 0 &&
+       r->target_rank != monmap.get_rank(peer)) {
+      ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+                    << " wants rank " << r->target_rank
+                    << ", reopening session"
+                    << dendl;
+      if (r->target_rank >= (int)monmap.size()) {
+       ldout(cct, 10) << " target " << r->target_rank
+                      << " >= max mon " << monmap.size() << dendl;
+       _finish_command(r, -ENOENT, "mon rank dne");
+       return;
+      }
+      _reopen_session(r->target_rank);
       return;
     }
-    ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
-                  << " wants mon " << r->target_name
-                  << ", reopening session"
-                  << dendl;
-    if (!monmap.contains(r->target_name)) {
-      ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
-      _finish_command(r, -ENOENT, "mon dne");
+    if (r->target_name.length() &&
+       r->target_name != monmap.get_name(peer)) {
+      ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+                    << " wants mon " << r->target_name
+                    << ", reopening session"
+                    << dendl;
+      if (!monmap.contains(r->target_name)) {
+       ldout(cct, 10) << " target " << r->target_name
+                      << " not present in monmap" << dendl;
+       _finish_command(r, -ENOENT, "mon dne");
+       return;
+      }
+      _reopen_session(monmap.get_rank(r->target_name));
       return;
     }
-    _reopen_session(monmap.get_rank(r->target_name));
-    return;
+    // fall-thru to send 'normal' CLI command
   }
 
+  // normal CLI command
   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
   auto m = ceph::make_message<MMonCommand>(monmap.fsid);
   m->set_tid(r->tid);
@@ -1084,6 +1173,23 @@ void MonClient::_send_command(MonCommand *r)
   return;
 }
 
+void MonClient::_check_tell_commands()
+{
+  // resend any requests
+  auto now = ceph_clock_now();
+  auto p = mon_commands.begin();
+  while (p != mon_commands.end()) {
+    auto cmd = p->second;
+    ++p;
+    if (cmd->is_tell() &&
+       cmd->last_send_attempt != utime_t() &&
+       now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
+      ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
+      _send_command(cmd); // might remove cmd from mon_commands
+    }
+  }
+}
+
 void MonClient::_resend_mon_commands()
 {
   // resend any requests
@@ -1091,7 +1197,9 @@ void MonClient::_resend_mon_commands()
   while (p != mon_commands.end()) {
     auto cmd = p->second;
     ++p;
-    _send_command(cmd); // might remove cmd from mon_commands
+    if (!cmd->is_tell()) {
+      _send_command(cmd); // might remove cmd from mon_commands
+    }
   }
 }
 
@@ -1120,6 +1228,33 @@ void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
   ack->put();
 }
 
+void MonClient::handle_command_reply(MCommandReply *reply)
+{
+  MonCommand *r = NULL;
+  uint64_t tid = reply->get_tid();
+
+  if (tid == 0 && !mon_commands.empty()) {
+    r = mon_commands.begin()->second;
+    ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
+                  << dendl;
+  } else {
+    auto p = mon_commands.find(tid);
+    if (p == mon_commands.end()) {
+      ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
+                    << dendl;
+      reply->put();
+      return;
+    }
+    r = p->second;
+  }
+
+  ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
+  if (r->poutbl)
+    r->poutbl->claim(reply->get_data());
+  _finish_command(r, reply->r, reply->rs);
+  reply->put();
+}
+
 int MonClient::_cancel_mon_command(uint64_t tid)
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
@@ -1146,6 +1281,9 @@ void MonClient::_finish_command(MonCommand *r, int ret, string rs)
     *(r->prs) = rs;
   if (r->onfinish)
     finisher.queue(r->onfinish, ret);
+  if (r->target_con) {
+    r->target_con->mark_down();
+  }
   mon_commands.erase(r->tid);
   delete r;
 }
@@ -1286,6 +1424,15 @@ int MonClient::get_auth_request(
   // connection to mon?
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     ceph_assert(!auth_meta->authorizer);
+    if (con->is_anon()) {
+      for (auto& i : mon_commands) {
+       if (i.second->target_con == con) {
+         return i.second->target_session->get_auth_request(
+           auth_method, preferred_modes, bl,
+           entity_name, want_keys, rotating_secrets.get());
+       }
+      }
+    }
     for (auto& i : pending_cons) {
       if (i.second.is_con(con)) {
        return i.second.get_auth_request(
@@ -1324,6 +1471,14 @@ int MonClient::handle_auth_reply_more(
   std::lock_guard l(monc_lock);
 
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+    if (con->is_anon()) {
+      for (auto& i : mon_commands) {
+       if (i.second->target_con == con) {
+         return i.second->target_session->handle_auth_reply_more(
+           auth_meta, bl, reply);
+       }
+      }
+    }
     for (auto& i : pending_cons) {
       if (i.second.is_con(con)) {
        return i.second.handle_auth_reply_more(auth_meta, bl, reply);
@@ -1353,6 +1508,15 @@ int MonClient::handle_auth_done(
 {
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     std::lock_guard l(monc_lock);
+    if (con->is_anon()) {
+      for (auto& i : mon_commands) {
+       if (i.second->target_con == con) {
+         return i.second->target_session->handle_auth_done(
+           auth_meta, global_id, bl,
+           session_key, connection_secret);
+       }
+      }
+    }
     for (auto& i : pending_cons) {
       if (i.second.is_con(con)) {
        int r = i.second.handle_auth_done(
@@ -1402,6 +1566,21 @@ int MonClient::handle_auth_bad_method(
 
   std::lock_guard l(monc_lock);
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+    if (con->is_anon()) {
+      for (auto& i : mon_commands) {
+       if (i.second->target_con == con) {
+         int r = i.second->target_session->handle_auth_bad_method(
+           old_auth_method,
+           result,
+           allowed_methods,
+           allowed_modes);
+         if (r < 0) {
+           _finish_command(i.second, r, "auth failed");
+         }
+         return r;
+       }
+      }
+    }
     for (auto& i : pending_cons) {
       if (i.second.is_con(con)) {
        int r = i.second.handle_auth_bad_method(old_auth_method,
@@ -1661,6 +1840,10 @@ int MonConnection::handle_auth_done(
     state = State::HAVE_SESSION;
   }
   con->set_last_keepalive_ack(auth_start);
+
+  if (pending_tell_command) {
+    con->send_message2(std::move(pending_tell_command));
+  }
   return auth_err;
 }
 
@@ -1786,6 +1969,10 @@ int MonConnection::authenticate(MAuthReply *m)
     auth->build_request(ma->auth_payload);
     con->send_message(ma);
   }
+  if (ret == 0 && pending_tell_command) {
+    con->send_message2(std::move(pending_tell_command));
+  }
+
   return ret;
 }
 
index 6341963175c11a91e7079b1fdba90f99f3725514..52c55fb18f321e8ef77b0e5af5426c1a7ce9ad36 100644 (file)
@@ -102,6 +102,9 @@ public:
   bool is_con(Connection *c) const {
     return con.get() == c;
   }
+  void queue_command(Message *m) {
+    pending_tell_command = m;
+  }
 
 private:
   int _negotiate(MAuthReply *m,
@@ -130,6 +133,8 @@ private:
   std::unique_ptr<AuthClientHandler> auth;
   uint64_t global_id;
 
+  MessageRef pending_tell_command;
+
   AuthRegistry *auth_registry;
 };
 
@@ -504,9 +509,14 @@ private:
   uint64_t last_mon_command_tid;
 
   struct MonCommand {
+    // for tell only
     std::string target_name;
     int target_rank;
-    unsigned send_attempts = 0;
+    ConnectionRef target_con;
+    std::unique_ptr<MonConnection> target_session;
+    unsigned send_attempts = 0;  ///< attempt count for legacy mons
+    utime_t last_send_attempt;
+
     uint64_t tid;
     std::vector<std::string> cmd;
     ceph::buffer::list inbl;
@@ -520,15 +530,21 @@ private:
        tid(t),
        poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
     {}
+
+    bool is_tell() const {
+      return target_name.size() || target_rank >= 0;
+    }
   };
   std::map<uint64_t,MonCommand*> mon_commands;
 
   void _send_command(MonCommand *r);
+  void _check_tell_commands();
   void _resend_mon_commands();
   int _cancel_mon_command(uint64_t tid);
   void _finish_command(MonCommand *r, int ret, std::string rs);
   void _finish_auth();
   void handle_mon_command_ack(MMonCommandAck *ack);
+  void handle_command_reply(MCommandReply *reply);
 
 public:
   void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,