From 1cbcc9535a85cc7ba52cef597fb831112df2f2fc Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 14 Oct 2011 20:19:38 -0700 Subject: [PATCH] osd: process commands in a workqueue This lets us do commands that can potentially block. For example: - flush pg stats to osd - request (and wait for) latest osdmap Currently the threadpool only has 1 thread. i.e., one concurrent command. That should be fine, methinks. Signed-off-by: Sage Weil --- src/common/config_opts.h | 1 + src/osd/OSD.cc | 11 +++++-- src/osd/OSD.h | 64 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 700a6ed8a7782..a6ab4bfbe2beb 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -250,6 +250,7 @@ OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1) OPTION(osd_scrub_thread_timeout, OPT_INT, 60) OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10) OPTION(osd_remove_thread_timeout, OPT_INT, 60*60) +OPTION(osd_command_thread_timeout, OPT_INT, 10*60) OPTION(osd_age, OPT_FLOAT, .8) OPTION(osd_age_time, OPT_INT, 0) OPTION(osd_heartbeat_interval, OPT_INT, 1) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 347c7f9869d7a..67b5712ccd4da 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -532,6 +532,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads), recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads), disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads), + command_tp(external_messenger->cct, "OSD::command_tp", 1), heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_epoch(0), hbin_messenger(hbinm), @@ -551,6 +552,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, last_tid(0), tid_lock("OSD::tid_lock"), backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp), + command_wq(this, g_conf->osd_command_thread_timeout, &command_tp), recovery_ops_active(0), recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp), remove_list_lock("OSD::remove_list_lock"), @@ -716,6 +718,7 @@ int OSD::init() op_tp.start(); recovery_tp.start(); disk_tp.start(); + command_tp.start(); // start the heartbeat heartbeat_thread.create(); @@ -827,6 +830,8 @@ int OSD::shutdown() heartbeat_lock.Unlock(); heartbeat_thread.join(); + command_tp.stop(); + // finish ops wait_for_no_ops(); dout(10) << "no ops" << dendl; @@ -2228,7 +2233,8 @@ void OSD::handle_command(MMonCommand *m) { if (!require_mon_peer(m)) return; - do_command(NULL, m->get_tid(), m->cmd, m->get_data()); + Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), NULL); + command_wq.queue(c); m->put(); } @@ -2251,7 +2257,8 @@ void OSD::handle_command(MCommand *m) return; } - do_command(con, m->get_tid(), m->cmd, m->get_data()); + Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con); + command_wq.queue(c); m->put(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 78366ff194cba..8b488df5ae075 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -221,6 +221,7 @@ private: ThreadPool op_tp; ThreadPool recovery_tp; ThreadPool disk_tp; + ThreadPool command_tp; // -- sessions -- public: @@ -535,10 +536,6 @@ protected: void send_pg_stats(const utime_t &now); void handle_pg_stats_ack(class MPGStatsAck *ack); - void handle_command(class MMonCommand *m); - void handle_command(class MCommand *m); - void do_command(Connection *con, tid_t tid, vector& cmd, bufferlist& data); - void pg_stat_queue_enqueue(PG *pg) { pg_stat_queue_lock.Lock(); if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) { @@ -651,6 +648,65 @@ protected: void generate_backlog(PG *pg); + // -- commands -- + struct Command { + vector cmd; + tid_t tid; + bufferlist indata; + Connection *con; + + Command(vector& c, tid_t t, bufferlist& bl, Connection *co) + : cmd(c), tid(t), indata(bl), con(co) { + if (con) + con->get(); + } + ~Command() { + if (con) + con->put(); + } + }; + list command_queue; + struct CommandWQ : public ThreadPool::WorkQueue { + OSD *osd; + CommandWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::CommandWQ", ti, 0, tp), osd(o) {} + + bool _empty() { + return osd->command_queue.empty(); + } + bool _enqueue(Command *c) { + osd->command_queue.push_back(c); + return true; + } + void _dequeue(Command *pg) { + assert(0); + } + Command *_dequeue() { + if (osd->command_queue.empty()) + return NULL; + Command *c = osd->command_queue.front(); + osd->command_queue.pop_front(); + return c; + } + void _process(Command *c) { + osd->osd_lock.Lock(); + osd->do_command(c->con, c->tid, c->cmd, c->indata); + osd->osd_lock.Unlock(); + delete c; + } + void _clear() { + while (!osd->command_queue.empty()) { + Command *c = osd->command_queue.front(); + osd->command_queue.pop_front(); + delete c; + } + } + } command_wq; + + void handle_command(class MMonCommand *m); + void handle_command(class MCommand *m); + void do_command(Connection *con, tid_t tid, vector& cmd, bufferlist& data); + // -- pg recovery -- xlist recovery_queue; utime_t defer_recovery_until; -- 2.39.5