#include "common/Thread.h"
#include "common/version.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
// re-include our assert to clobber the system one; fix dout:
#include "include/ceph_assert.h"
// read off one byte
char buf;
::read(m_wakeup_rd_fd, &buf, 1);
+ do_tell_queue();
}
if (m_shutdown) {
// Parent wants us to shut down
return rval;
}
+void AdminSocket::do_tell_queue()
+{
+ ldout(m_cct,10) << __func__ << dendl;
+ std::list<ref_t<MCommand>> q;
+ {
+ std::lock_guard l(tell_lock);
+ q.swap(tell_queue);
+ }
+ for (auto& m : q) {
+ bufferlist outbl;
+ bool success = execute_command(m->cmd, outbl);
+ int r = success ? 0 : -1; // FIXME!
+ auto reply = new MCommandReply(r, "");
+ reply->set_tid(m->get_tid());
+ reply->set_data(outbl);
+ m->get_connection()->send_message(reply);
+ }
+}
+
int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
ceph::bufferlist& out)
{
return true;
}
+void AdminSocket::queue_tell_command(ref_t<MCommand> m)
+{
+ ldout(m_cct,10) << __func__ << " " << *m << dendl;
+ std::lock_guard l(tell_lock);
+ tell_queue.push_back(std::move(m));
+ wakeup();
+}
bool AdminSocket::validate(const std::string& command,
remove_cleanup_file(m_path);
m_path.clear();
}
+
+void AdminSocket::wakeup()
+{
+ // Send a byte to the wakeup pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
+ (void)r;
+}
#include <thread>
#include "include/buffer.h"
+#include "common/ref.h"
#include "common/cmdparse.h"
class AdminSocket;
class CephContext;
+class MCommand;
using namespace std::literals;
int execute_command(const std::vector<std::string>& cmd,
ceph::bufferlist& out);
+ void queue_tell_command(ref_t<MCommand> m);
+
private:
void shutdown();
+ void wakeup();
std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr);
std::string destroy_wakeup_pipe();
std::thread th;
void entry() noexcept;
bool do_accept();
+ void do_tell_queue();
bool validate(const std::string& command,
const cmdmap_t& cmdmap,
ceph::buffer::list& out) const;
std::unique_ptr<AdminSocketHook> help_hook;
std::unique_ptr<AdminSocketHook> getdescs_hook;
+ std::mutex tell_lock;
+ std::list<ref_t<MCommand>> tell_queue;
+
struct hook_info {
AdminSocketHook* hook;
std::string desc;