#include <functional>
#include <map>
#include <thread>
-#include <xmmintrin.h>
#include <spdk/nvme.h>
-#include <rte_lcore.h>
-
#include "include/stringify.h"
#include "include/types.h"
#include "include/compat.h"
#undef dout_prefix
#define dout_prefix *_dout << "bdev(" << sn << ") "
-static constexpr uint16_t data_buffer_default_num = 2048;
+thread_local SharedDriverQueueData *queue_t;
+
+static constexpr uint16_t data_buffer_default_num = 1024;
static constexpr uint32_t data_buffer_size = 8192;
static constexpr uint16_t inline_segment_num = 32;
-static thread_local int queue_id = -1;
-
enum {
l_bluestore_nvmedevice_first = 632430,
l_bluestore_nvmedevice_write_lat,
static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
-static int dpdk_thread_adaptor(void *f)
-{
- (*static_cast<std::function<void ()>*>(f))();
- return 0;
-}
-
struct IORequest {
uint16_t cur_seg_idx = 0;
uint16_t nseg;
};
struct Task;
+
+class SharedDriverData {
+ unsigned id;
+ std::string sn;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ uint64_t block_size = 0;
+ uint32_t sector_size = 0;
+ uint64_t size = 0;
+
+ public:
+ std::vector<NVMEDevice*> registered_devices;
+ friend class SharedDriverQueueData;
+ SharedDriverData(unsigned _id, const std::string &sn_tag,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+ : id(_id),
+ sn(sn_tag),
+ ctrlr(c),
+ ns(ns) {
+ sector_size = spdk_nvme_ns_get_sector_size(ns);
+ block_size = std::max(CEPH_PAGE_SIZE, sector_size);
+ size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
+ }
+
+ bool is_equal(const string &tag) const { return sn == tag; }
+ ~SharedDriverData() {
+ }
+
+ void register_device(NVMEDevice *device) {
+ registered_devices.push_back(device);
+ }
+
+ void remove_device(NVMEDevice *device) {
+ std::vector<NVMEDevice*> new_devices;
+ for (auto &&it : registered_devices) {
+ if (it != device)
+ new_devices.push_back(it);
+ }
+ registered_devices.swap(new_devices);
+ }
+
+ uint64_t get_block_size() {
+ return block_size;
+ }
+ uint64_t get_size() {
+ return size;
+ }
+};
+
class SharedDriverQueueData {
+ NVMEDevice *bdev;
SharedDriverData *driver;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
std::string sn;
uint64_t block_size;
uint32_t sector_size;
- uint32_t core_id;
- uint32_t queueid;
uint32_t max_queue_depth;
struct spdk_nvme_qpair *qpair;
- std::function<void ()> run_func;
-
- bool aio_stop = false;
- void _aio_thread();
+ bool reap_io = false;
int alloc_buf_from_pool(Task *t, bool write);
- std::atomic_bool queue_empty;
- Mutex queue_lock;
- Cond queue_cond;
- std::queue<Task*> task_queue;
-
- Mutex flush_lock;
- Cond flush_cond;
- std::atomic_int flush_waiters;
- std::set<uint64_t> flush_waiter_seqs;
-
public:
uint32_t current_queue_depth = 0;
std::atomic_ulong completed_op_seq, queue_op_seq;
std::vector<void*> data_buf_mempool;
PerfCounters *logger = nullptr;
+ void _aio_handle(Task *t, IOContext *ioc);
+
+ SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+ : bdev(bdev),
+ driver(driver) {
+ ctrlr = driver->ctrlr;
+ ns = driver->ns;
+ block_size = driver->block_size;
+ sector_size = driver->sector_size;
- SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
- const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
- : driver(driver),
- ctrlr(c),
- ns(ns),
- sn(sn_tag),
- block_size(block_size),
- sector_size(sector_size),
- core_id(core),
- queueid(queue_id),
- run_func([this]() { _aio_thread(); }),
- queue_empty(false),
- queue_lock("NVMEDevice::queue_lock"),
- flush_lock("NVMEDevice::flush_lock"),
- flush_waiters(0),
- completed_op_seq(0), queue_op_seq(0) {
struct spdk_nvme_io_qpair_opts opts = {};
spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
opts.qprio = SPDK_NVME_QPRIO_URGENT;
qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
assert(qpair != NULL);
+ // allocate spdk dma memory
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+ if (!b) {
+ derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+ assert(b);
+ }
+ data_buf_mempool.push_back(b);
+ }
+
PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
logger = b.create_perf_counters();
g_ceph_context->get_perfcounters_collection()->add(logger);
- }
-
- void queue_task(Task *t, uint64_t ops = 1) {
- queue_op_seq += ops;
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.load()) {
- queue_empty = false;
- queue_cond.Signal();
- }
- }
-
- void flush_wait() {
- uint64_t cur_seq = queue_op_seq.load();
- uint64_t left = cur_seq - completed_op_seq.load();
- if (cur_seq > completed_op_seq) {
- // TODO: this may contains read op
- dout(10) << __func__ << " existed inflight ops " << left << dendl;
- Mutex::Locker l(flush_lock);
- ++flush_waiters;
- flush_waiter_seqs.insert(cur_seq);
- while (cur_seq > completed_op_seq.load()) {
- flush_cond.Wait(flush_lock);
- }
- flush_waiter_seqs.erase(cur_seq);
- --flush_waiters;
- }
- }
-
- void start() {
- int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
- core_id);
- assert(r == 0);
-
- }
-
- void stop() {
- {
- Mutex::Locker l(queue_lock);
- aio_stop = true;
- queue_cond.Signal();
- }
- int r = rte_eal_wait_lcore(core_id);
- assert(r == 0);
- aio_stop = false;
+ bdev->queue_number++;
+ if(bdev->queue_number.load() == 1)
+ reap_io = true;
}
~SharedDriverQueueData() {
g_ceph_context->get_perfcounters_collection()->remove(logger);
if (!qpair) {
- spdk_nvme_ctrlr_free_io_qpair(qpair);
+ spdk_nvme_ctrlr_free_io_qpair(qpair);
+ bdev->queue_number--;
}
- delete logger;
- }
-};
-
-class SharedDriverData {
- unsigned id;
- uint32_t core_id;
-
- std::string sn;
- spdk_nvme_ctrlr *ctrlr;
- spdk_nvme_ns *ns;
- uint64_t block_size = 0;
- uint32_t sector_size = 0;
- uint64_t size = 0;
- uint32_t queue_number;
- std::vector<SharedDriverQueueData*> queues;
-
- void _aio_start() {
- for (auto &&it : queues)
- it->start();
- }
- void _aio_stop() {
- for (auto &&it : queues)
- it->stop();
- }
-
- public:
- std::vector<NVMEDevice*> registered_devices;
- SharedDriverData(unsigned _id, const std::string &sn_tag,
- spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
- : id(_id),
- sn(sn_tag),
- ctrlr(c),
- ns(ns) {
- int i;
- sector_size = spdk_nvme_ns_get_sector_size(ns);
- block_size = std::max(CEPH_PAGE_SIZE, sector_size);
- size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
-
- RTE_LCORE_FOREACH_SLAVE(i) {
- queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
- }
-
- _aio_start();
- }
- bool is_equal(const string &tag) const { return sn == tag; }
- ~SharedDriverData() {
- for (auto p : queues) {
- delete p;
- }
- }
-
- SharedDriverQueueData *get_queue(uint32_t i) {
- return queues.at(i%queue_number);
- }
-
- void register_device(NVMEDevice *device) {
- // in case of registered_devices, we stop thread now.
- // Because release is really a rare case, we could bear this
- _aio_stop();
- registered_devices.push_back(device);
- _aio_start();
- }
-
- void remove_device(NVMEDevice *device) {
- _aio_stop();
- std::vector<NVMEDevice*> new_devices;
- for (auto &&it : registered_devices) {
- if (it != device)
- new_devices.push_back(it);
+ // free all spdk dma memory;
+ if (!data_buf_mempool.empty()) {
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = data_buf_mempool[i];
+ assert(b);
+ spdk_dma_free(b);
+ }
+ data_buf_mempool.clear();
}
- registered_devices.swap(new_devices);
- _aio_start();
- }
- uint64_t get_block_size() {
- return block_size;
- }
- uint64_t get_size() {
- return size;
+ delete logger;
}
};
copied += need_copy;
}
}
-
- void io_wait() {
- std::unique_lock<std::mutex> l(lock);
- cond.wait(l);
- }
-
- void io_wake() {
- std::lock_guard<std::mutex> l(lock);
- cond.notify_all();
- }
};
static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
return 0;
}
-void SharedDriverQueueData::_aio_thread()
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
{
- dout(1) << __func__ << " start" << dendl;
-
- if (data_buf_mempool.empty()) {
- for (uint16_t i = 0; i < data_buffer_default_num; i++) {
- void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
- if (!b) {
- derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
- assert(b);
- }
- data_buf_mempool.push_back(b);
- }
- }
+ dout(20) << __func__ << " start" << dendl;
- Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
- while (true) {
- bool inflight = queue_op_seq.load() - completed_op_seq.load();
+ while (ioc->num_running) {
again:
dout(40) << __func__ << " polling" << dendl;
- if (inflight) {
+ if (current_queue_depth) {
spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion);
}
}
current_queue_depth++;
}
-
- if (!queue_empty.load()) {
- Mutex::Locker l(queue_lock);
- if (!task_queue.empty()) {
- t = task_queue.front();
- task_queue.pop();
- logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
- }
- if (!t)
- queue_empty = true;
- } else {
- if (flush_waiters.load()) {
- Mutex::Locker l(flush_lock);
- if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
- flush_cond.Signal();
- }
-
- if (!inflight) {
- // be careful, here we need to let each thread reap its own, currently it is done
- // by only one dedicatd dpdk thread
- if(!queueid) {
- for (auto &&it : driver->registered_devices)
- it->reap_ioc();
- }
-
- Mutex::Locker l(queue_lock);
- if (queue_empty.load()) {
- cur = ceph::coarse_real_clock::now();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
- logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
- if (aio_stop)
- break;
- queue_cond.Wait(queue_lock);
- start = ceph::coarse_real_clock::now();
- }
- }
- }
+ cur = ceph::coarse_real_clock::now();
+ auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+ logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
+ start = ceph::coarse_real_clock::now();
}
- assert(data_buf_mempool.size() == data_buffer_default_num);
- dout(1) << __func__ << " end" << dendl;
+
+ if(reap_io)
+ bdev->reap_ioc();
+ dout(20) << __func__ << " end" << dendl;
}
#define dout_subsys ceph_subsys_bdev
}
}
- // at least two cores are needed for using spdk
- if (core_num < 2) {
+ // at least one core is needed for using spdk
+ if (core_num < 1) {
r = -ENOENT;
- derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
+ derr << __func__ << " invalid spdk coremask, at least one core is needed: "
<< cpp_strerror(r) << dendl;
return r;
}
assert(queue != NULL);
assert(ctx != NULL);
- ++queue->completed_op_seq;
--queue->current_queue_depth;
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - task->start);
{
}
-
int NVMEDevice::open(const string& p)
{
int r = 0;
<< " block_size " << block_size << " (" << pretty_si_t(block_size)
<< "B)" << dendl;
+
return 0;
}
int NVMEDevice::flush()
{
- dout(10) << __func__ << " start" << dendl;
- auto start = ceph::coarse_real_clock::now();
-
- if(queue_id == -1)
- queue_id = ceph_gettid();
- SharedDriverQueueData *queue = driver->get_queue(queue_id);
- assert(queue != NULL);
- queue->flush_wait();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
- ceph::coarse_real_clock::now() - start);
- queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
return 0;
}
ioc->num_pending -= pending;
assert(ioc->num_pending.load() == 0); // we should be only thread doing this
// Only need to push the first entry
- if (queue_id == -1)
- queue_id = ceph_gettid();
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
- driver->get_queue(queue_id)->queue_task(t, pending);
+ if(!queue_t)
+ queue_t = new SharedDriverQueueData(this, driver);
+ queue_t->_aio_handle(t, ioc);
}
}