]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/admin_socket: add ability to process MCommand via asok queue
authorSage Weil <sage@redhat.com>
Thu, 5 Sep 2019 19:24:30 +0000 (14:24 -0500)
committerSage Weil <sage@redhat.com>
Tue, 1 Oct 2019 21:30:52 +0000 (16:30 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/admin_socket.cc
src/common/admin_socket.h

index cea62e5f0e3e8a607f9bb7d1b4f848057e5c7f61..ef22b3c2615a38d072892347944b6c7cb966b697 100644 (file)
@@ -22,6 +22,8 @@
 #include "common/Thread.h"
 #include "common/version.h"
 
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
 
 // re-include our assert to clobber the system one; fix dout:
 #include "include/ceph_assert.h"
@@ -246,6 +248,7 @@ void AdminSocket::entry() noexcept
       // read off one byte
       char buf;
       ::read(m_wakeup_rd_fd, &buf, 1);
+      do_tell_queue();
     }
     if (m_shutdown) {
       // Parent wants us to shut down
@@ -364,6 +367,25 @@ bool AdminSocket::do_accept()
   return rval;
 }
 
+void AdminSocket::do_tell_queue()
+{
+  ldout(m_cct,10) << __func__ << dendl;
+  std::list<ref_t<MCommand>> q;
+  {
+    std::lock_guard l(tell_lock);
+    q.swap(tell_queue);
+  }
+  for (auto& m : q) {
+    bufferlist outbl;
+    bool success = execute_command(m->cmd, outbl);
+    int r = success ? 0 : -1;  // FIXME!
+    auto reply = new MCommandReply(r, "");
+    reply->set_tid(m->get_tid());
+    reply->set_data(outbl);
+    m->get_connection()->send_message(reply);
+  }
+}
+
 int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
                                 ceph::bufferlist& out)
 {
@@ -433,6 +455,13 @@ int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
   return true;
 }
 
+void AdminSocket::queue_tell_command(ref_t<MCommand> m)
+{
+  ldout(m_cct,10) << __func__ << " " << *m << dendl;
+  std::lock_guard l(tell_lock);
+  tell_queue.push_back(std::move(m));
+  wakeup();
+}
 
 
 bool AdminSocket::validate(const std::string& command,
@@ -668,3 +697,11 @@ void AdminSocket::shutdown()
   remove_cleanup_file(m_path);
   m_path.clear();
 }
+
+void AdminSocket::wakeup()
+{
+  // Send a byte to the wakeup pipe that the thread is listening to
+  char buf[1] = { 0x0 };
+  int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
+  (void)r;
+}
index 2e10fd074725f237869654fd81b320390837ceed..5e7d56b3f2fd5edb761bb46e3ea2c16b62cd61ac 100644 (file)
 #include <thread>
 
 #include "include/buffer.h"
+#include "common/ref.h"
 #include "common/cmdparse.h"
 
 class AdminSocket;
 class CephContext;
+class MCommand;
 
 using namespace std::literals;
 
@@ -97,9 +99,12 @@ public:
   int execute_command(const std::vector<std::string>& cmd,
                      ceph::bufferlist& out);
 
+  void queue_tell_command(ref_t<MCommand> m);
+
 private:
 
   void shutdown();
+  void wakeup();
 
   std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr);
   std::string destroy_wakeup_pipe();
@@ -108,6 +113,7 @@ private:
   std::thread th;
   void entry() noexcept;
   bool do_accept();
+  void do_tell_queue();
   bool validate(const std::string& command,
                const cmdmap_t& cmdmap,
                ceph::buffer::list& out) const;
@@ -126,6 +132,9 @@ private:
   std::unique_ptr<AdminSocketHook> help_hook;
   std::unique_ptr<AdminSocketHook> getdescs_hook;
 
+  std::mutex tell_lock;
+  std::list<ref_t<MCommand>> tell_queue;
+
   struct hook_info {
     AdminSocketHook* hook;
     std::string desc;