OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
+OPTION(osd_command_thread_timeout, OPT_INT, 10*60)
OPTION(osd_age, OPT_FLOAT, .8)
OPTION(osd_age_time, OPT_INT, 0)
OPTION(osd_heartbeat_interval, OPT_INT, 1)
op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads),
recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
+ command_tp(external_messenger->cct, "OSD::command_tp", 1),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_epoch(0),
hbin_messenger(hbinm),
last_tid(0),
tid_lock("OSD::tid_lock"),
backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp),
+ command_wq(this, g_conf->osd_command_thread_timeout, &command_tp),
recovery_ops_active(0),
recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
remove_list_lock("OSD::remove_list_lock"),
op_tp.start();
recovery_tp.start();
disk_tp.start();
+ command_tp.start();
// start the heartbeat
heartbeat_thread.create();
heartbeat_lock.Unlock();
heartbeat_thread.join();
+ command_tp.stop();
+
// finish ops
wait_for_no_ops();
dout(10) << "no ops" << dendl;
{
if (!require_mon_peer(m))
return;
- do_command(NULL, m->get_tid(), m->cmd, m->get_data());
+ Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), NULL);
+ command_wq.queue(c);
m->put();
}
return;
}
- do_command(con, m->get_tid(), m->cmd, m->get_data());
+ Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con);
+ command_wq.queue(c);
m->put();
}
ThreadPool op_tp;
ThreadPool recovery_tp;
ThreadPool disk_tp;
+ ThreadPool command_tp;
// -- sessions --
public:
void send_pg_stats(const utime_t &now);
void handle_pg_stats_ack(class MPGStatsAck *ack);
- void handle_command(class MMonCommand *m);
- void handle_command(class MCommand *m);
- void do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist& data);
-
void pg_stat_queue_enqueue(PG *pg) {
pg_stat_queue_lock.Lock();
if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
void generate_backlog(PG *pg);
+ // -- commands --
+ struct Command {
+ vector<string> cmd;
+ tid_t tid;
+ bufferlist indata;
+ Connection *con;
+
+ Command(vector<string>& c, tid_t t, bufferlist& bl, Connection *co)
+ : cmd(c), tid(t), indata(bl), con(co) {
+ if (con)
+ con->get();
+ }
+ ~Command() {
+ if (con)
+ con->put();
+ }
+ };
+ list<Command*> command_queue;
+ struct CommandWQ : public ThreadPool::WorkQueue<Command> {
+ OSD *osd;
+ CommandWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, 0, tp), osd(o) {}
+
+ bool _empty() {
+ return osd->command_queue.empty();
+ }
+ bool _enqueue(Command *c) {
+ osd->command_queue.push_back(c);
+ return true;
+ }
+ void _dequeue(Command *pg) {
+ assert(0);
+ }
+ Command *_dequeue() {
+ if (osd->command_queue.empty())
+ return NULL;
+ Command *c = osd->command_queue.front();
+ osd->command_queue.pop_front();
+ return c;
+ }
+ void _process(Command *c) {
+ osd->osd_lock.Lock();
+ osd->do_command(c->con, c->tid, c->cmd, c->indata);
+ osd->osd_lock.Unlock();
+ delete c;
+ }
+ void _clear() {
+ while (!osd->command_queue.empty()) {
+ Command *c = osd->command_queue.front();
+ osd->command_queue.pop_front();
+ delete c;
+ }
+ }
+ } command_wq;
+
+ void handle_command(class MMonCommand *m);
+ void handle_command(class MCommand *m);
+ void do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist& data);
+
// -- pg recovery --
xlist<PG*> recovery_queue;
utime_t defer_recovery_until;