discard_callback(d_cb),
discard_callback_priv(d_cbpriv),
aio_stop(false),
- discard_started(false),
- discard_stop(false),
aio_thread(this),
- discard_thread(this),
injecting_crash(0)
{
+ cct->_conf.add_observer(this);
fd_directs.resize(WRITE_LIFE_MAX, -1);
fd_buffereds.resize(WRITE_LIFE_MAX, -1);
}
}
+KernelDevice::~KernelDevice()
+{
+ cct->_conf.remove_observer(this);
+}
+
int KernelDevice::_lock()
{
// When the block changes, systemd-udevd will open the block,
{
path = p;
int r = 0, i = 0;
+ uint64_t num_discard_threads = 0;
dout(1) << __func__ << " path " << path << dendl;
struct stat statbuf;
if (r < 0) {
goto out_fail;
}
- if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+
+ num_discard_threads = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+ if (support_discard && cct->_conf->bdev_enable_discard && num_discard_threads > 0) {
_discard_start();
}
{
dout(1) << __func__ << dendl;
_aio_stop();
- if (discard_thread.is_started()) {
+ if (_discard_started()) {
_discard_stop();
}
_pre_close();
void KernelDevice::_discard_start()
{
- discard_thread.create("bstore_discard");
+ uint64_t num = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+ dout(10) << __func__ << " starting " << num << " threads" << dendl;
+
+ std::unique_lock l(discard_lock);
+
+ target_discard_threads = num;
+ discard_threads.reserve(num);
+ for(uint64_t i = 0; i < num; i++)
+ {
+ // All threads created with the same name
+ discard_threads.emplace_back(new DiscardThread(this, i));
+ discard_threads.back()->create("bstore_discard");
+ }
+
+ dout(10) << __func__ << " started " << num << " threads" << dendl;
}
void KernelDevice::_discard_stop()
{
dout(10) << __func__ << dendl;
+
+ // Signal threads to stop, then wait for them to join
{
std::unique_lock l(discard_lock);
- while (!discard_started) {
+ while (discard_threads.empty()) {
discard_cond.wait(l);
}
- discard_stop = true;
+
+ for(auto &t : discard_threads) {
+ t->stop = true;
+ }
+
discard_cond.notify_all();
}
- discard_thread.join();
- {
- std::lock_guard l(discard_lock);
- discard_stop = false;
- }
+
+ // Threads are shared pointers and are cleaned up for us
+ for(auto &t : discard_threads)
+ t->join();
+ discard_threads.clear();
+
dout(10) << __func__ << " stopped" << dendl;
}
+bool KernelDevice::_discard_started()
+{
+ std::unique_lock l(discard_lock);
+ return !discard_threads.empty();
+}
+
void KernelDevice::discard_drain()
{
dout(10) << __func__ << dendl;
{
// https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
- r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
#if defined(__linux__)
r == -EREMCHG || r == -EBADE
dout(10) << __func__ << " end" << dendl;
}
-void KernelDevice::_discard_thread()
+void KernelDevice::_discard_thread(uint64_t tid)
{
+ dout(10) << __func__ << " thread " << tid << " start" << dendl;
+
+ // Thread-local list of processing discards
+ interval_set<uint64_t> discard_processing;
+
std::unique_lock l(discard_lock);
- ceph_assert(!discard_started);
- discard_started = true;
discard_cond.notify_all();
+
+ // Keeps the shared pointer around until erased from the vector
+ // and until we leave this function
+ auto thr = discard_threads[tid];
+
while (true) {
- ceph_assert(discard_finishing.empty());
+ ceph_assert(discard_processing.empty());
if (discard_queued.empty()) {
- if (discard_stop)
+ if (thr->stop)
break;
dout(20) << __func__ << " sleep" << dendl;
discard_cond.notify_all(); // for the thread trying to drain...
discard_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
- discard_finishing.swap(discard_queued);
+ // Swap the queued discards for a local list we'll process here
+ // without caring about thread fairness. This allows the current
+ // thread to wait on the discard running while other threads pick
+ // up the next-in-queue, and do the same, ultimately issuing more
+ // discards in parallel, which is the goal.
+ discard_processing.swap(discard_queued);
discard_running = true;
l.unlock();
dout(20) << __func__ << " finishing" << dendl;
- for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
- _discard(p.get_start(), p.get_len());
+ for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) {
+ _discard(p.get_start(), p.get_len());
}
- discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
- discard_finishing.clear();
+ discard_callback(discard_callback_priv, static_cast<void*>(&discard_processing));
+ discard_processing.clear();
l.lock();
discard_running = false;
}
}
- dout(10) << __func__ << " finish" << dendl;
- discard_started = false;
+
+ dout(10) << __func__ << " thread " << tid << " finish" << dendl;
}
int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release)
{
// if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
- if (!discard_thread.is_started())
+ if (!_discard_started())
return -1;
if (to_release.empty())
std::lock_guard l(discard_lock);
discard_queued.insert(to_release);
- discard_cond.notify_all();
+ discard_cond.notify_one();
return 0;
}
if (!support_discard || !cct->_conf->bdev_enable_discard)
return false;
- if (async && discard_thread.is_started()) {
+ if (async) {
return 0 == _queue_discard(to_release);
} else {
for (auto p = to_release.begin(); p != to_release.end(); ++p) {
}
return r;
}
+
+const char** KernelDevice::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "bdev_async_discard_threads",
+ NULL
+ };
+ return KEYS;
+}
+
+void KernelDevice::handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed)
+{
+ if (changed.count("bdev_async_discard_threads")) {
+ std::unique_lock l(discard_lock);
+
+ uint64_t oldval = target_discard_threads;
+ uint64_t newval = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+
+ target_discard_threads = newval;
+
+ // Increase? Spawn now, it's quick
+ if (newval > oldval) {
+ dout(10) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl;
+ discard_threads.reserve(target_discard_threads);
+ for(uint64_t i = oldval; i < newval; i++)
+ {
+ // All threads created with the same name
+ discard_threads.emplace_back(new DiscardThread(this, i));
+ discard_threads.back()->create("bstore_discard");
+ }
+ } else {
+ // Decrease? Signal threads after telling them to stop
+ dout(10) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl;
+
+ // Signal the last threads to quit, and stop tracking them
+ for(uint64_t i = oldval - 1; i >= newval; i--)
+ {
+ // Also detach the thread so we no longer need to join
+ discard_threads[i]->stop = true;
+ discard_threads[i]->detach();
+ discard_threads.erase(discard_threads.begin() + i);
+ }
+
+ discard_cond.notify_all();
+ }
+ }
+}
#include "include/types.h"
#include "include/interval_set.h"
+#include "common/config_obs.h"
#include "common/Thread.h"
#include "include/utime.h"
#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK)
-class KernelDevice : public BlockDevice {
+class KernelDevice : public BlockDevice,
+ public md_config_obs_t {
protected:
std::string path;
private:
aio_callback_t discard_callback;
void *discard_callback_priv;
bool aio_stop;
- bool discard_started;
- bool discard_stop;
ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
ceph::condition_variable discard_cond;
bool discard_running = false;
interval_set<uint64_t> discard_queued;
- interval_set<uint64_t> discard_finishing;
struct AioCompletionThread : public Thread {
KernelDevice *bdev;
struct DiscardThread : public Thread {
KernelDevice *bdev;
- explicit DiscardThread(KernelDevice *b) : bdev(b) {}
+ const uint64_t id;
+ bool stop = false;
+ explicit DiscardThread(KernelDevice *b, uint64_t id) : bdev(b), id(id) {}
void *entry() override {
- bdev->_discard_thread();
+ bdev->_discard_thread(id);
return NULL;
}
- } discard_thread;
+ };
+ std::vector<std::shared_ptr<DiscardThread>> discard_threads;
+ uint64_t target_discard_threads = 0;
std::atomic_int injecting_crash;
virtual void _pre_close() { } // hook for child implementations
void _aio_thread();
- void _discard_thread();
+ void _discard_thread(uint64_t tid);
int _queue_discard(interval_set<uint64_t> &to_release);
bool try_discard(interval_set<uint64_t> &to_release, bool async = true) override;
void _discard_start();
void _discard_stop();
+ bool _discard_started();
void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
public:
KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+ ~KernelDevice();
void aio_submit(IOContext *ioc) override;
void discard_drain() override;
int invalidate_cache(uint64_t off, uint64_t len) override;
int open(const std::string& path) override;
void close() override;
+
+ // config observer bits
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override;
};
#endif