// NVMe driver is loaded while osd is running.
OPTION(bdev_nvme_unbind_from_kernel, OPT_BOOL)
OPTION(bdev_nvme_retry_count, OPT_INT) // -1 means by default which is 4
+OPTION(bdev_enable_discard, OPT_BOOL)
+OPTION(bdev_async_discard, OPT_BOOL)
OPTION(objectstore_blackhole, OPT_BOOL)
.set_default(-1)
.set_description(""),
+ Option("bdev_enable_discard", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(false)
+ .set_description(""),
+
+ Option("bdev_async_discard", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(false)
+ .set_description(""),
+
Option("bluefs_alloc_size", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(1_M)
.set_description(""),
}
BlockDevice *BlockDevice::create(CephContext* cct, const string& path,
- aio_callback_t cb, void *cbpriv)
+ aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
{
string type = "kernel";
char buf[PATH_MAX + 1];
#endif
#if defined(HAVE_LIBAIO)
if (type == "kernel") {
- return new KernelDevice(cct, cb, cbpriv);
+ return new KernelDevice(cct, cb, cbpriv, d_cb, d_cbpriv);
}
#endif
#if defined(HAVE_SPDK)
virtual ~BlockDevice() = default;
static BlockDevice *create(
- CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv);
+ CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
virtual bool supported_bdev_label() { return true; }
virtual bool is_rotational() { return rotational; }
bool buffered) = 0;
virtual int flush() = 0;
virtual int discard(uint64_t offset, uint64_t len) { return 0; }
+ virtual int queue_discard(interval_set<uint64_t> &to_release) { return -1; }
+ virtual void discard_drain() { return; }
void queue_reap_ioc(IOContext *ioc);
void reap_ioc();
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReader, bluefs_file_reader, bluefs);
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileLock, bluefs_file_lock, bluefs);
+static void wal_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_WAL, *tmp);
+}
+
+static void db_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_DB, *tmp);
+}
+
+static void slow_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_SLOW, *tmp);
+}
BlueFS::BlueFS(CephContext* cct)
: cct(cct),
ioc(MAX_BDEV),
block_all(MAX_BDEV)
{
+ discard_cb[BDEV_WAL] = wal_discard_cb;
+ discard_cb[BDEV_DB] = db_discard_cb;
+ discard_cb[BDEV_SLOW] = slow_discard_cb;
}
BlueFS::~BlueFS()
dout(10) << __func__ << " bdev " << id << " path " << path << dendl;
assert(id < bdev.size());
assert(bdev[id] == NULL);
- BlockDevice *b = BlockDevice::create(cct, path, NULL, NULL);
+ BlockDevice *b = BlockDevice::create(cct, path, NULL, NULL, discard_cb[id], static_cast<void*>(this));
int r = b->open(path);
if (r < 0) {
delete b;
return 0;
}
+void BlueFS::handle_discard(unsigned id, interval_set<uint64_t>& to_release)
+{
+ dout(10) << __func__ << " bdev " << id << dendl;
+ assert(alloc[id]);
+ alloc[id]->release(to_release);
+}
+
uint64_t BlueFS::get_fs_usage()
{
std::lock_guard<std::mutex> l(lock);
void BlueFS::_stop_alloc()
{
dout(20) << __func__ << dendl;
+ for (auto p : bdev) {
+ if (p)
+ p->discard_drain();
+ }
+
for (auto p : alloc) {
if (p != nullptr) {
p->shutdown();
for (unsigned i = 0; i < to_release.size(); ++i) {
if (!to_release[i].empty()) {
/* OK, now we have the guarantee alloc[i] won't be null. */
- for (auto p = to_release[i].begin(); p != to_release[i].end(); ++p) {
- bdev[i]->discard(p.get_start(), p.get_len());
+ int r = 0;
+ if (cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+ r = bdev[i]->queue_discard(to_release[i]);
+ if (r == 0)
+ continue;
+ } else if (cct->_conf->bdev_enable_discard) {
+ for (auto p = to_release[i].begin(); p != to_release[i].end(); ++p) {
+ bdev[i]->discard(p.get_start(), p.get_len());
+ }
}
alloc[i]->release(to_release[i]);
}
vector<Allocator*> alloc; ///< allocators for bdevs
vector<interval_set<uint64_t>> pending_release; ///< extents to release
+ BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev
+
void _init_logger();
void _shutdown_logger();
void _update_logger_stats();
int reclaim_blocks(unsigned bdev, uint64_t want,
PExtentVector *extents);
+ // handler for discard event
+ void handle_discard(unsigned dev, interval_set<uint64_t>& to_release);
+
void flush(FileWriter *h) {
std::lock_guard<std::mutex> l(lock);
_flush(h, false);
c->aio_finish(store);
}
+static void discard_cb(void *priv, void *priv2)
+{
+ BlueStore *store = static_cast<BlueStore*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ store->handle_discard(*tmp);
+}
+
+void BlueStore::handle_discard(interval_set<uint64_t>& to_release)
+{
+ dout(10) << __func__ << dendl;
+ assert(alloc);
+ alloc->release(to_release);
+}
+
BlueStore::BlueStore(CephContext *cct, const string& path)
: ObjectStore(cct, path),
throttle_bytes(cct, "bluestore_throttle_bytes",
{
assert(bdev == NULL);
string p = path + "/block";
- bdev = BlockDevice::create(cct, p, aio_cb, static_cast<void*>(this));
+ bdev = BlockDevice::create(cct, p, aio_cb, static_cast<void*>(this), discard_cb, static_cast<void*>(this));
int r = bdev->open(p);
if (r < 0)
goto fail;
void BlueStore::_close_alloc()
{
+ assert(bdev);
+ bdev->discard_drain();
+
assert(alloc);
alloc->shutdown();
delete alloc;
{
// it's expected we're called with lazy_release_lock already taken!
if (likely(!cct->_conf->bluestore_debug_no_reuse_blocks)) {
- for (interval_set<uint64_t>::iterator p = txc->released.begin();
- p != txc->released.end();
- ++p) {
- bdev->discard(p.get_start(), p.get_len());
+ int r = 0;
+ if (cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+ r = bdev->queue_discard(txc->released);
+ if (r == 0) {
+ dout(10) << __func__ << "(queued) " << txc << " " << std::hex
+ << txc->released << std::dec << dendl;
+ goto out;
+ }
+ } else if (cct->_conf->bdev_enable_discard) {
+ for (auto p = txc->released.begin(); p != txc->released.end(); ++p) {
+ bdev->discard(p.get_start(), p.get_len());
+ }
}
- dout(10) << __func__ << " " << txc << " " << std::hex
+ dout(10) << __func__ << "(sync) " << txc << " " << std::hex
<< txc->released << std::dec << dendl;
alloc->release(txc->released);
}
+
+out:
txc->allocated.clear();
txc->released.clear();
}
void handle_conf_change(const struct md_config_t *conf,
const std::set<std::string> &changed) override;
+ //handler for discard event
+ void handle_discard(interval_set<uint64_t>& to_release);
+
void _set_csum();
void _set_compression();
void _set_throttle_params();
#undef dout_prefix
#define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
-KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
: BlockDevice(cct, cb, cbpriv),
fd_direct(-1),
fd_buffered(-1),
fs(NULL), aio(false), dio(false),
debug_lock("KernelDevice::debug_lock"),
aio_queue(cct->_conf->bdev_aio_max_queue_depth),
+ discard_callback(d_cb),
+ discard_callback_priv(d_cbpriv),
aio_stop(false),
+ discard_started(false),
+ discard_stop(false),
aio_thread(this),
+ discard_thread(this),
injecting_crash(0)
{
}
if (r < 0) {
goto out_fail;
}
+ _discard_start();
fs = FS::create_by_fd(fd_direct);
assert(fs);
{
dout(1) << __func__ << dendl;
_aio_stop();
+ _discard_stop();
assert(fs);
delete fs;
}
}
+int KernelDevice::_discard_start()
+{
+ discard_thread.create("bstore_discard");
+ return 0;
+}
+
+void KernelDevice::_discard_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock<std::mutex> l(discard_lock);
+ while (!discard_started) {
+ discard_cond.wait(l);
+ }
+ discard_stop = true;
+ discard_cond.notify_all();
+ }
+ discard_thread.join();
+ {
+ std::lock_guard<std::mutex> l(discard_lock);
+ discard_stop = false;
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
+void KernelDevice::discard_drain()
+{
+ dout(10) << __func__ << dendl;
+ std::unique_lock<std::mutex> l(discard_lock);
+ while (!discard_queued.empty() || discard_running) {
+ discard_cond.wait(l);
+ }
+}
+
void KernelDevice::_aio_thread()
{
dout(10) << __func__ << " start" << dendl;
dout(10) << __func__ << " end" << dendl;
}
+void KernelDevice::_discard_thread()
+{
+ std::unique_lock<std::mutex> l(discard_lock);
+ assert(!discard_started);
+ discard_started = true;
+ discard_cond.notify_all();
+ while (true) {
+ assert(discard_finishing.empty());
+ if (discard_queued.empty()) {
+ if (discard_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ discard_cond.notify_all(); // for the thread trying to drain...
+ discard_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ discard_finishing.swap(discard_queued);
+ discard_running = true;
+ l.unlock();
+ dout(20) << __func__ << " finishing" << dendl;
+ for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
+ discard(p.get_start(), p.get_len());
+ }
+
+ discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
+ discard_finishing.clear();
+ l.lock();
+ discard_running = false;
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+ discard_started = false;
+}
+
+int KernelDevice::queue_discard(interval_set<uint64_t> &to_release)
+{
+ if (rotational)
+ return -1;
+
+ if (to_release.empty())
+ return 0;
+
+ std::lock_guard<std::mutex> l(discard_lock);
+ discard_queued.insert(to_release);
+ discard_cond.notify_all();
+ return 0;
+}
+
void KernelDevice::_aio_log_start(
IOContext *ioc,
uint64_t offset,
std::mutex flush_mutex;
aio_queue_t aio_queue;
+ aio_callback_t discard_callback;
+ void *discard_callback_priv;
bool aio_stop;
+ bool discard_started;
+ bool discard_stop;
+
+ std::mutex discard_lock;
+ std::condition_variable discard_cond;
+ bool discard_running = false;
+ interval_set<uint64_t> discard_queued;
+ interval_set<uint64_t> discard_finishing;
struct AioCompletionThread : public Thread {
KernelDevice *bdev;
}
} aio_thread;
+ struct DiscardThread : public Thread {
+ KernelDevice *bdev;
+ explicit DiscardThread(KernelDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_discard_thread();
+ return NULL;
+ }
+ } discard_thread;
+
std::atomic_int injecting_crash;
void _aio_thread();
+ void _discard_thread();
+ int queue_discard(interval_set<uint64_t> &to_release) override;
+
int _aio_start();
void _aio_stop();
+ int _discard_start();
+ void _discard_stop();
+
void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
void debug_aio_unlink(aio_t& aio);
public:
- KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
+ KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
void aio_submit(IOContext *ioc) override;
+ void discard_drain() override;
int collect_metadata(const std::string& prefix, map<std::string,std::string> *pm) const override;
int get_devname(std::string *s) override {