]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/admin_socket: support alternative call_async()
authorSage Weil <sage@redhat.com>
Fri, 6 Sep 2019 21:45:05 +0000 (16:45 -0500)
committerSage Weil <sage@redhat.com>
Tue, 1 Oct 2019 21:30:53 +0000 (16:30 -0500)
Some tell commands reply asynchronously.  Support that through the generic
infrastructure.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/admin_socket.cc
src/common/admin_socket.h

index 16c811f4e88a6ad5af385c1ad09655015e97017c..e30451821cc4ea71999a67e50a000fffa6f31865 100644 (file)
@@ -21,6 +21,8 @@
 #include "common/safe_io.h"
 #include "common/Thread.h"
 #include "common/version.h"
+#include "common/ceph_mutex.h"
+#include "common/Cond.h"
 
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
@@ -346,9 +348,27 @@ bool AdminSocket::do_accept()
     }
   }
 
-  bufferlist out;
   std::vector<std::string> cmdvec = { c };
-  int rval = execute_command(cmdvec, out);
+  bufferlist out;
+  string rs;
+  int rval = 0;
+
+  bool done = false;
+  ceph::mutex mylock = ceph::make_mutex("admin_socket::do_accept::mylock");
+  ceph::condition_variable mycond;
+  C_SafeCond fin(mylock, mycond, &done, &rval);
+  execute_command(
+    cmdvec,
+    [&rs, &out, &fin](int r, const std::string& err, bufferlist& outbl) {
+      rs = err;
+      out.claim(outbl);
+      fin.finish(r);
+    });
+  {
+    std::unique_lock l{mylock};
+    mycond.wait(l, [&done] { return done;});
+  }
+
   // Unfortunately, the asok wire protocol does not let us pass an error code,
   // and many asok command implementations return helpful error strings.  So,
   // let's interpret all but -ENOSYS as a "success" so that the user can still
@@ -385,31 +405,37 @@ void AdminSocket::do_tell_queue()
   }
   for (auto& m : q) {
     bufferlist outbl;
-    int r = execute_command(m->cmd, outbl);
-    auto reply = new MCommandReply(r, "");
-    reply->set_tid(m->get_tid());
-    reply->set_data(outbl);
-    m->get_connection()->send_message(reply);
+    execute_command(
+      m->cmd,
+      [m](int r, const std::string& err, bufferlist& outbl) {
+       auto reply = new MCommandReply(r, err);
+       reply->set_tid(m->get_tid());
+       reply->set_data(outbl);
+       m->get_connection()->send_message(reply);
+      });
   }
 }
 
-int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
-                                ceph::bufferlist& out)
+void AdminSocket::execute_command(
+  const std::vector<std::string>& cmdvec,
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
   cmdmap_t cmdmap;
   string format;
   stringstream errss;
+  bufferlist empty;
   ldout(m_cct,10) << __func__ << " cmdvec='" << cmdvec << "'" << dendl;
   if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
     ldout(m_cct, 0) << "AdminSocket: " << errss.str() << dendl;
-    return -EINVAL;
+    return on_finish(-EINVAL, "invalid json", empty);
   }
   string prefix;
   try {
     cmd_getval(m_cct, cmdmap, "format", format);
     cmd_getval(m_cct, cmdmap, "prefix", prefix);
   } catch (const bad_cmd_get& e) {
-    return -EINVAL;
+    return on_finish(-EINVAL, "invalid json, missing format and/or prefix",
+                    empty);
   }
   if (format != "json" && format != "json-pretty" &&
       format != "xml" && format != "xml-pretty")
@@ -421,7 +447,7 @@ int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
   if (p == hooks.cend()) {
     lderr(m_cct) << "AdminSocket: request '" << cmdvec
                 << "' not defined" << dendl;
-    return -ENOSYS;
+    return on_finish(-EINVAL, "unknown command prefix "s + prefix, empty);
   }
 
   // Drop lock to avoid cycles in cases where the hook takes
@@ -431,27 +457,14 @@ int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
   in_hook = true;
   auto hook = p->second.hook;
   l.unlock();
-  int r;
-  if (!validate(prefix, cmdmap, out)) {
-    r = -EINVAL;
+  if (!validate(prefix, cmdmap, empty)) {
+    on_finish(-EINVAL, "invalid command json", empty);
   } else {
-    r = hook->call(prefix, cmdmap, format, out);
+    hook->call_async(prefix, cmdmap, format, on_finish);
   }
   l.lock();
   in_hook = false;
   in_hook_cond.notify_all();
-  if (r < 0) {
-    ldout(m_cct, 0) << "AdminSocket: request '" << prefix
-                   << "' to " << hook << " failed, " << cpp_strerror(r)
-                   << dendl;
-    out.append("failed");
-  } else {
-    ldout(m_cct, 5) << "AdminSocket: request '" << prefix
-                   << "' to " << hook
-                   << " returned " << r << " and " << out.length() << " bytes"
-                   << dendl;
-  }
-  return r;
 }
 
 void AdminSocket::queue_tell_command(ref_t<MCommand> m)
index efa222ddedc9a63b347ad9f24e7fccc2538ebe5a..e64fc4fe28db7982091b031cca13153772499646 100644 (file)
@@ -37,6 +37,16 @@ class AdminSocketHook {
 public:
   virtual int call(std::string_view command, const cmdmap_t& cmdmap,
                   std::string_view format, ceph::buffer::list& out) = 0;
+  virtual void call_async(
+    std::string_view command,
+    const cmdmap_t& cmdmap,
+    std::string_view format,
+    std::function<void(int,const std::string&,bufferlist&)> on_finish) {
+    // by default, call the synchronous handler and then finish
+    bufferlist out;
+    int r = call(command, cmdmap, format, out);
+    on_finish(r, "", out);
+  }
   virtual ~AdminSocketHook() {}
 };
 
@@ -83,8 +93,9 @@ public:
 
   void chown(uid_t uid, gid_t gid);
   void chmod(mode_t mode);
-  int execute_command(const std::vector<std::string>& cmd,
-                     ceph::bufferlist& out);
+  void execute_command(
+    const std::vector<std::string>& cmd,
+    std::function<void(int,const std::string&,bufferlist&)> on_fin);
 
   void queue_tell_command(ref_t<MCommand> m);