]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: process commands in a workqueue
authorSage Weil <sage@newdream.net>
Sat, 15 Oct 2011 03:19:38 +0000 (20:19 -0700)
committerSage Weil <sage@newdream.net>
Sat, 15 Oct 2011 03:43:23 +0000 (20:43 -0700)
This lets us do commands that can potentially block.  For example:

 - flush pg stats to osd
 - request (and wait for) latest osdmap

Currently the threadpool only has 1 thread.  i.e., one concurrent command.
That should be fine, methinks.

Signed-off-by: Sage Weil <sage@newdream.net>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h

index 700a6ed8a778258fd9569168cfb26d75fa13cc28..a6ab4bfbe2beb5462927adbaba39d620e3022be4 100644 (file)
@@ -250,6 +250,7 @@ OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1)
 OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
 OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
 OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
+OPTION(osd_command_thread_timeout, OPT_INT, 10*60)
 OPTION(osd_age, OPT_FLOAT, .8)
 OPTION(osd_age_time, OPT_INT, 0)
 OPTION(osd_heartbeat_interval, OPT_INT, 1)
index 347c7f9869d7aed2cf04f058abc7a8b83568e919..67b5712ccd4dab4a594205f03352450a1bfbda87 100644 (file)
@@ -532,6 +532,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads),
   recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
   disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
+  command_tp(external_messenger->cct, "OSD::command_tp", 1),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false), heartbeat_epoch(0),
   hbin_messenger(hbinm),
@@ -551,6 +552,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   last_tid(0),
   tid_lock("OSD::tid_lock"),
   backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp),
+  command_wq(this, g_conf->osd_command_thread_timeout, &command_tp),
   recovery_ops_active(0),
   recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
   remove_list_lock("OSD::remove_list_lock"),
@@ -716,6 +718,7 @@ int OSD::init()
   op_tp.start();
   recovery_tp.start();
   disk_tp.start();
+  command_tp.start();
 
   // start the heartbeat
   heartbeat_thread.create();
@@ -827,6 +830,8 @@ int OSD::shutdown()
   heartbeat_lock.Unlock();
   heartbeat_thread.join();
 
+  command_tp.stop();
+
   // finish ops
   wait_for_no_ops();
   dout(10) << "no ops" << dendl;
@@ -2228,7 +2233,8 @@ void OSD::handle_command(MMonCommand *m)
 {
   if (!require_mon_peer(m))
     return;
-  do_command(NULL, m->get_tid(), m->cmd, m->get_data());
+  Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), NULL);
+  command_wq.queue(c);
   m->put();
 }
 
@@ -2251,7 +2257,8 @@ void OSD::handle_command(MCommand *m)
     return;
   }
 
-  do_command(con, m->get_tid(), m->cmd, m->get_data());
+  Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con);
+  command_wq.queue(c);
 
   m->put();
 }
index 78366ff194cbae02004f8b0d1867d5990381caba..8b488df5ae075d2c198b975e7fd982967b1948d6 100644 (file)
@@ -221,6 +221,7 @@ private:
   ThreadPool op_tp;
   ThreadPool recovery_tp;
   ThreadPool disk_tp;
+  ThreadPool command_tp;
 
   // -- sessions --
 public:
@@ -535,10 +536,6 @@ protected:
   void send_pg_stats(const utime_t &now);
   void handle_pg_stats_ack(class MPGStatsAck *ack);
 
-  void handle_command(class MMonCommand *m);
-  void handle_command(class MCommand *m);
-  void do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist& data);
-
   void pg_stat_queue_enqueue(PG *pg) {
     pg_stat_queue_lock.Lock();
     if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
@@ -651,6 +648,65 @@ protected:
   void generate_backlog(PG *pg);
 
 
+  // -- commands --
+  struct Command {
+    vector<string> cmd;
+    tid_t tid;
+    bufferlist indata;
+    Connection *con;
+
+    Command(vector<string>& c, tid_t t, bufferlist& bl, Connection *co)
+      : cmd(c), tid(t), indata(bl), con(co) {
+      if (con)
+       con->get();
+    }
+    ~Command() {
+      if (con)
+       con->put();
+    }
+  };
+  list<Command*> command_queue;
+  struct CommandWQ : public ThreadPool::WorkQueue<Command> {
+    OSD *osd;
+    CommandWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, 0, tp), osd(o) {}
+
+    bool _empty() {
+      return osd->command_queue.empty();
+    }
+    bool _enqueue(Command *c) {
+      osd->command_queue.push_back(c);
+      return true;
+    }
+    void _dequeue(Command *pg) {
+      assert(0);
+    }
+    Command *_dequeue() {
+      if (osd->command_queue.empty())
+       return NULL;
+      Command *c = osd->command_queue.front();
+      osd->command_queue.pop_front();
+      return c;
+    }
+    void _process(Command *c) {
+      osd->osd_lock.Lock();
+      osd->do_command(c->con, c->tid, c->cmd, c->indata);
+      osd->osd_lock.Unlock();
+      delete c;
+    }
+    void _clear() {
+      while (!osd->command_queue.empty()) {
+       Command *c = osd->command_queue.front();
+       osd->command_queue.pop_front();
+       delete c;
+      }
+    }
+  } command_wq;
+
+  void handle_command(class MMonCommand *m);
+  void handle_command(class MCommand *m);
+  void do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist& data);
+
   // -- pg recovery --
   xlist<PG*> recovery_queue;
   utime_t defer_recovery_until;