And make timeout_interval/suicide_interval atomic for WorkQueue_ and BaseShardedWQ.
Signed-off-by: haoyixing <haoyixing@kuaishou.com>
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
<< " (" << processing << " active)" << dendl;
ul.unlock();
- TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
+ TPHandle tp_handle(cct, hb, wq->timeout_interval.load(), wq->suicide_interval.load());
tp_handle.reset_tp_timeout();
wq->_void_process(item, tp_handle);
ul.lock();
while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- wq->timeout_interval,
- wq->suicide_interval);
+ wq->timeout_interval.load(),
+ wq->suicide_interval.load());
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- wq->timeout_interval,
- wq->suicide_interval);
+ wq->timeout_interval.load(),
+ wq->suicide_interval.load());
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
}
cct->get_heartbeat_map()->reset_timeout(
- hb,
- wq->timeout_interval,
- wq->suicide_interval);
- wq->_process(thread_index, hb);
-
+ hb,
+ wq->timeout_interval.load(),
+ wq->suicide_interval.load());
+ wq->_process(thread_index, hb);
}
ldout(cct,10) << "sharded worker finish" << dendl;
shardedpool_cond.notify_all();
ldout(cct,10) << "drained" << dendl;
}
-
/// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
std::string name;
- ceph::timespan timeout_interval;
- ceph::timespan suicide_interval;
+ std::atomic<ceph::timespan> timeout_interval = ceph::timespan::zero();
+ std::atomic<ceph::timespan> suicide_interval = ceph::timespan::zero();
WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
: name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
{ }
* It can be used for non-thread-safe finalization. */
virtual void _void_process_finish(void *) = 0;
void set_timeout(time_t ti){
- timeout_interval = ceph::make_timespan(ti);
+ timeout_interval.store(ceph::make_timespan(ti));
}
void set_suicide_timeout(time_t sti){
- suicide_interval = ceph::make_timespan(sti);
+ suicide_interval.store(ceph::make_timespan(sti));
}
};
class BaseShardedWQ {
public:
- ceph::timespan timeout_interval, suicide_interval;
+ std::atomic<ceph::timespan> timeout_interval = ceph::timespan::zero();
+ std::atomic<ceph::timespan> suicide_interval = ceph::timespan::zero();
BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
:timeout_interval(ti), suicide_interval(sti) {}
virtual ~BaseShardedWQ() {}
virtual void return_waiting_threads() = 0;
virtual void stop_return_waiting_threads() = 0;
virtual bool is_shard_empty(uint32_t thread_index) = 0;
+ void set_timeout(time_t ti) {
+ timeout_interval.store(ceph::make_timespan(ti));
+ }
+ void set_suicide_timeout(time_t sti) {
+ suicide_interval.store(ceph::make_timespan(sti));
+ }
};
template <typename T>
void* item = wq->_void_dequeue();
if (item) {
processing++;
- TPHandle tp_handle(cct, nullptr, wq->timeout_interval, wq->suicide_interval);
+ TPHandle tp_handle(cct, nullptr, wq->timeout_interval.load(), wq->suicide_interval.load());
wq->_void_process(item, tp_handle);
processing--;
}
if (batch.entry_count) {
TPHandle tp_handle(store->cct,
nullptr,
- timeout_interval,
- suicide_interval);
+ timeout_interval.load(),
+ suicide_interval.load());
ceph_assert(batch.running == 0);
batch.running++; // just to be on-par with the regular call
"osd_object_clean_region_max_num_intervals",
"osd_scrub_min_interval",
"osd_scrub_max_interval",
+ "osd_op_thread_timeout",
+ "osd_op_thread_suicide_timeout",
NULL
};
return KEYS;
service.poolctx.stop();
service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
}
+ if (changed.count("osd_op_thread_timeout")) {
+ op_shardedwq.set_timeout(g_conf().get_val<int64_t>("osd_op_thread_timeout"));
+ }
+ if (changed.count("osd_op_thread_suicide_timeout")) {
+ op_shardedwq.set_suicide_timeout(g_conf().get_val<int64_t>("osd_op_thread_suicide_timeout"));
+ }
}
void OSD::maybe_override_max_osd_capacity_for_qos()
}
// found a work item; reapply default wq timeouts
osd->cct->get_heartbeat_map()->reset_timeout(hb,
- timeout_interval, suicide_interval);
+ timeout_interval.load(), suicide_interval.load());
} else {
dout(20) << __func__ << " need return immediately" << dendl;
wait_lock.unlock();
sdata->shard_lock.lock();
// Reapply default wq timeouts
osd->cct->get_heartbeat_map()->reset_timeout(hb,
- timeout_interval, suicide_interval);
+ timeout_interval.load(), suicide_interval.load());
// Populate the oncommits list if there were any additions
// to the context_queue while we were waiting
if (is_smallest_thread_index) {
<< " waiting " << slot->waiting
<< " waiting_peering " << slot->waiting_peering << dendl;
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
- suicide_interval);
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval.load(),
+ suicide_interval.load());
// take next item
auto qi = std::move(slot->to_process.front());
tp.start();
twq wq(2, 20, &tp);
// check timeout and suicide
- ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval);
- ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval);
+ ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval.load());
+ ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval.load());
// change the timeout and suicide and then check them
wq.set_timeout(4);
wq.set_suicide_timeout(40);
- ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval);
- ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval);
+ ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval.load());
+ ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval.load());
tp.stop();
}