From 3c381398aa1a8ca1d98c905d833dd38203527165 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 6 Sep 2019 16:45:05 -0500 Subject: [PATCH] common/admin_socket: support alternative call_async() Some tell commands reply asynchronously. Support that through the generic infrastructure. Signed-off-by: Sage Weil --- src/common/admin_socket.cc | 69 ++++++++++++++++++++++---------------- src/common/admin_socket.h | 15 +++++++-- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/src/common/admin_socket.cc b/src/common/admin_socket.cc index 16c811f4e88a6..e30451821cc4e 100644 --- a/src/common/admin_socket.cc +++ b/src/common/admin_socket.cc @@ -21,6 +21,8 @@ #include "common/safe_io.h" #include "common/Thread.h" #include "common/version.h" +#include "common/ceph_mutex.h" +#include "common/Cond.h" #include "messages/MCommand.h" #include "messages/MCommandReply.h" @@ -346,9 +348,27 @@ bool AdminSocket::do_accept() } } - bufferlist out; std::vector cmdvec = { c }; - int rval = execute_command(cmdvec, out); + bufferlist out; + string rs; + int rval = 0; + + bool done = false; + ceph::mutex mylock = ceph::make_mutex("admin_socket::do_accept::mylock"); + ceph::condition_variable mycond; + C_SafeCond fin(mylock, mycond, &done, &rval); + execute_command( + cmdvec, + [&rs, &out, &fin](int r, const std::string& err, bufferlist& outbl) { + rs = err; + out.claim(outbl); + fin.finish(r); + }); + { + std::unique_lock l{mylock}; + mycond.wait(l, [&done] { return done;}); + } + // Unfortunately, the asok wire protocol does not let us pass an error code, // and many asok command implementations return helpful error strings. So, // let's interpret all but -ENOSYS as a "success" so that the user can still @@ -385,31 +405,37 @@ void AdminSocket::do_tell_queue() } for (auto& m : q) { bufferlist outbl; - int r = execute_command(m->cmd, outbl); - auto reply = new MCommandReply(r, ""); - reply->set_tid(m->get_tid()); - reply->set_data(outbl); - m->get_connection()->send_message(reply); + execute_command( + m->cmd, + [m](int r, const std::string& err, bufferlist& outbl) { + auto reply = new MCommandReply(r, err); + reply->set_tid(m->get_tid()); + reply->set_data(outbl); + m->get_connection()->send_message(reply); + }); } } -int AdminSocket::execute_command(const std::vector& cmdvec, - ceph::bufferlist& out) +void AdminSocket::execute_command( + const std::vector& cmdvec, + std::function on_finish) { cmdmap_t cmdmap; string format; stringstream errss; + bufferlist empty; ldout(m_cct,10) << __func__ << " cmdvec='" << cmdvec << "'" << dendl; if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) { ldout(m_cct, 0) << "AdminSocket: " << errss.str() << dendl; - return -EINVAL; + return on_finish(-EINVAL, "invalid json", empty); } string prefix; try { cmd_getval(m_cct, cmdmap, "format", format); cmd_getval(m_cct, cmdmap, "prefix", prefix); } catch (const bad_cmd_get& e) { - return -EINVAL; + return on_finish(-EINVAL, "invalid json, missing format and/or prefix", + empty); } if (format != "json" && format != "json-pretty" && format != "xml" && format != "xml-pretty") @@ -421,7 +447,7 @@ int AdminSocket::execute_command(const std::vector& cmdvec, if (p == hooks.cend()) { lderr(m_cct) << "AdminSocket: request '" << cmdvec << "' not defined" << dendl; - return -ENOSYS; + return on_finish(-EINVAL, "unknown command prefix "s + prefix, empty); } // Drop lock to avoid cycles in cases where the hook takes @@ -431,27 +457,14 @@ int AdminSocket::execute_command(const std::vector& cmdvec, in_hook = true; auto hook = p->second.hook; l.unlock(); - int r; - if (!validate(prefix, cmdmap, out)) { - r = -EINVAL; + if (!validate(prefix, cmdmap, empty)) { + on_finish(-EINVAL, "invalid command json", empty); } else { - r = hook->call(prefix, cmdmap, format, out); + hook->call_async(prefix, cmdmap, format, on_finish); } l.lock(); in_hook = false; in_hook_cond.notify_all(); - if (r < 0) { - ldout(m_cct, 0) << "AdminSocket: request '" << prefix - << "' to " << hook << " failed, " << cpp_strerror(r) - << dendl; - out.append("failed"); - } else { - ldout(m_cct, 5) << "AdminSocket: request '" << prefix - << "' to " << hook - << " returned " << r << " and " << out.length() << " bytes" - << dendl; - } - return r; } void AdminSocket::queue_tell_command(ref_t m) diff --git a/src/common/admin_socket.h b/src/common/admin_socket.h index efa222ddedc9a..e64fc4fe28db7 100644 --- a/src/common/admin_socket.h +++ b/src/common/admin_socket.h @@ -37,6 +37,16 @@ class AdminSocketHook { public: virtual int call(std::string_view command, const cmdmap_t& cmdmap, std::string_view format, ceph::buffer::list& out) = 0; + virtual void call_async( + std::string_view command, + const cmdmap_t& cmdmap, + std::string_view format, + std::function on_finish) { + // by default, call the synchronous handler and then finish + bufferlist out; + int r = call(command, cmdmap, format, out); + on_finish(r, "", out); + } virtual ~AdminSocketHook() {} }; @@ -83,8 +93,9 @@ public: void chown(uid_t uid, gid_t gid); void chmod(mode_t mode); - int execute_command(const std::vector& cmd, - ceph::bufferlist& out); + void execute_command( + const std::vector& cmd, + std::function on_fin); void queue_tell_command(ref_t m); -- 2.39.5