* so at most one copy will execute simultaneously for a given thread pool.
* It can be used for non-thread-safe finalization. */
virtual void _void_process_finish(void *) = 0;
+ void set_timeout(time_t ti){
+ timeout_interval = ceph::make_timespan(ti);
+ }
+ void set_suicide_timeout(time_t sti){
+ suicide_interval = ceph::make_timespan(sti);
+ }
};
// track thread pool size changes
"filestore_sloppy_crc",
"filestore_sloppy_crc_block_size",
"filestore_max_alloc_hint_size",
+ "filestore_op_thread_suicide_timeout",
+ "filestore_op_thread_timeout",
NULL
};
return KEYS;
dump_stop();
}
}
+ if (changed.count("filestore_op_thread_timeout")){
+ op_wq.set_timeout(g_conf().get_val<int64_t>("filestore_op_thread_timeout"));
+ }
+ if (changed.count("filestore_op_thread_suicide_timeout")){
+ op_wq.set_suicide_timeout(g_conf().get_val<int64_t>("filestore_op_thread_suicide_timeout"));
+ }
}
int FileStore::set_throttle_params()
sleep(1);
tp.stop();
}
+
+class twq : public ThreadPool::WorkQueue<int> {
+public:
+ twq(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<int>("test_wq", ceph::make_timespan(timeout), ceph::make_timespan(suicide_timeout), tp) {}
+
+ bool _enqueue(int* item) override {
+ return true;
+ }
+ void _dequeue(int* item) override {
+ ceph_abort();
+ }
+ bool _empty() override {
+ return true;
+ }
+ int *_dequeue() override {
+ return nullptr;
+ }
+ void _process(int *osr, ThreadPool::TPHandle &handle) override {
+ }
+ void _process_finish(int *osr) override {
+ }
+ void _clear() override {
+ }
+};
+
+TEST(WorkQueue, change_timeout){
+ ThreadPool tp(g_ceph_context, "bar", "tp_bar", 2, "filestore_op_threads");
+ tp.start();
+ twq wq(2, 20, &tp);
+ // check timeout and suicide
+ ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval);
+ ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval);
+
+ // change the timeout and suicide and then check them
+ wq.set_timeout(4);
+ wq.set_suicide_timeout(40);
+ ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval);
+ ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval);
+ tp.stop();
+}