{
ceph_assert(ceph_mutex_is_not_locked(client_lock));
+ if (upkeeper.joinable())
+ upkeeper.join();
+
// It is necessary to hold client_lock, because any inode destruction
// may call into ObjectCacher, which asserts that it's lock (which is
// client_lock) is held.
// MDS commands, we may have sessions that need closing.
{
std::scoped_lock l{client_lock};
+
+ // To make sure the tick thread will be stoppped before
+ // destructing the Client, just in case like the _mount()
+ // failed but didn't not get a chance to stop the tick
+ // thread
+ tick_thread_stopped = true;
+ upkeep_cond.notify_one();
+
_close_sessions();
}
cct->_conf.remove_observer(this);
return r;
}
- cl.unlock();
- tick(); // start tick
- cl.lock();
+ start_tick_thread(); // start tick thread
if (require_mds) {
while (1) {
traceout.close();
}
- {
- std::scoped_lock l(timer_lock);
- if (tick_event)
- timer.cancel_event(tick_event);
- tick_event = 0;
- }
+ // stop the tick thread
+ tick_thread_stopped = true;
+ upkeep_cond.notify_one();
_close_sessions();
{
ldout(cct, 20) << "tick" << dendl;
- {
- std::scoped_lock l(timer_lock);
- tick_event = timer.add_event_after(
- cct->_conf->client_tick_interval,
- new LambdaContext([this](int) {
- tick();
- }));
- }
-
- if (cct->_conf->client_debug_inject_tick_delay > 0) {
- sleep(cct->_conf->client_debug_inject_tick_delay);
- ceph_assert(0 == cct->_conf.set_val("client_debug_inject_tick_delay", "0"));
- cct->_conf.apply_changes(nullptr);
- }
-
utime_t now = ceph_clock_now();
- std::scoped_lock cl(client_lock);
/*
* If the mount() is not finished
*/
}
}
+void Client::start_tick_thread()
+{
+ upkeeper = std::thread([this]() {
+ using time = ceph::coarse_mono_time;
+ using sec = std::chrono::seconds;
+
+ auto last_tick = time::min();
+
+ std::unique_lock cl(client_lock);
+ while (!tick_thread_stopped) {
+ auto now = clock::now();
+ auto since = now - last_tick;
+
+ auto t_interval = clock::duration(cct->_conf.get_val<sec>("client_tick_interval"));
+ auto d_interval = clock::duration(cct->_conf.get_val<sec>("client_debug_inject_tick_delay"));
+
+ // Clear the debug inject tick delay
+ if (unlikely(d_interval.count() > 0)) {
+ ldout(cct, 20) << "clear debug inject tick delay: " << d_interval << dendl;
+ ceph_assert(0 == cct->_conf.set_val("client_debug_inject_tick_delay", "0"));
+ cct->_conf.apply_changes(nullptr);
+ }
+
+ auto interval = std::max(t_interval, d_interval);
+ if (likely(since >= interval)) {
+ tick();
+ last_tick = clock::now();
+ } else {
+ interval -= since;
+ }
+
+ ldout(cct, 20) << "upkeep thread waiting interval " << interval << dendl;
+ if (!tick_thread_stopped)
+ upkeep_cond.wait_for(cl, interval);
+ }
+ });
+}
+
void Client::collect_and_send_metrics() {
ldout(cct, 20) << __func__ << dendl;
#include <memory>
#include <set>
#include <string>
+#include <thread>
using std::set;
using std::map;
template <typename T> friend class RWRef;
using Dispatcher::cct;
+ using clock = ceph::coarse_mono_clock;
typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct ceph_statx *stx, off_t off, Inode *in);
void flush_cap_releases();
void renew_and_flush_cap_releases();
void tick();
+ void start_tick_thread();
void inc_dentry_nr() {
++dentry_nr;
xlist<Inode*> &get_dirty_list() { return dirty_list; }
- /* timer_lock for 'timer' and 'tick_event' */
+ /* timer_lock for 'timer' */
ceph::mutex timer_lock = ceph::make_mutex("Client::timer_lock");
- Context *tick_event = nullptr;
SafeTimer timer;
+ /* tick thread */
+ std::thread upkeeper;
+ ceph::condition_variable upkeep_cond;
+ bool tick_thread_stopped = false;
+
std::unique_ptr<PerfCounters> logger;
std::unique_ptr<MDSMap> mdsmap;
OPTION(client_cache_mid, OPT_FLOAT)
OPTION(client_use_random_mds, OPT_BOOL)
OPTION(client_mount_timeout, OPT_DOUBLE)
-OPTION(client_tick_interval, OPT_DOUBLE)
OPTION(client_trace, OPT_STR)
OPTION(client_readahead_min, OPT_LONGLONG) // readahead at _least_ this much.
OPTION(client_readahead_max_bytes, OPT_LONGLONG) // default unlimited
OPTION(client_oc_max_objects, OPT_INT) // max objects in cache
OPTION(client_debug_getattr_caps, OPT_BOOL) // check if MDS reply contains wanted caps
OPTION(client_debug_force_sync_read, OPT_BOOL) // always read synchronously (go to osds)
-OPTION(client_debug_inject_tick_delay, OPT_INT) // delay the client tick for a number of seconds
OPTION(client_max_inline_size, OPT_U64)
OPTION(client_inject_release_failure, OPT_BOOL) // synthetic client bug for testing
OPTION(client_inject_fixed_oldest_tid, OPT_BOOL) // synthetic client bug for testing
.set_default(300.0)
.set_description("timeout for mounting CephFS (seconds)"),
- Option("client_tick_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
- .set_default(1.0)
+ Option("client_tick_interval", Option::TYPE_SECS, Option::LEVEL_DEV)
+ .set_default(1)
.set_description("seconds between client upkeep ticks"),
Option("client_trace", Option::TYPE_STR, Option::LEVEL_DEV)
.set_default(false)
.set_description(""),
- Option("client_debug_inject_tick_delay", Option::TYPE_INT, Option::LEVEL_DEV)
+ Option("client_debug_inject_tick_delay", Option::TYPE_SECS, Option::LEVEL_DEV)
.set_default(0)
.set_description(""),