#include "common/errno.h"
#include "common/debug.h"
#include "common/perf_counters.h"
+#include "common/io_priority.h"
#include "NVMEDevice.h"
#undef dout_prefix
#define dout_prefix *_dout << "bdev(" << sn << ") "
-std::vector<void*> data_buf_mempool;
-
static constexpr uint16_t data_buffer_default_num = 2048;
static constexpr uint32_t data_buffer_size = 8192;
static constexpr uint16_t inline_segment_num = 32;
+static thread_local int queue_id = -1;
+
enum {
l_bluestore_nvmedevice_first = 632430,
l_bluestore_nvmedevice_aio_write_lat,
void **extra_segs = nullptr;
};
-struct Task {
- NVMEDevice *device;
- IOContext *ctx = nullptr;
- IOCommand command;
- uint64_t offset;
- uint64_t len;
- bufferlist write_bl;
- std::function<void()> fill_cb;
- Task *next = nullptr;
- int64_t return_code;
- ceph::coarse_real_clock::time_point start;
- IORequest io_request;
- std::mutex lock;
- std::condition_variable cond;
- Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
- : device(dev), command(c), offset(off), len(l),
- return_code(rc),
- start(ceph::coarse_real_clock::now()) {}
- ~Task() {
- assert(!io_request.nseg);
- }
- void release_segs() {
- if (io_request.extra_segs) {
- for (uint16_t i = 0; i < io_request.nseg; i++)
- data_buf_mempool.push_back(io_request.extra_segs[i]);
- delete io_request.extra_segs;
- } else if (io_request.nseg) {
- for (uint16_t i = 0; i < io_request.nseg; i++)
- data_buf_mempool.push_back(io_request.inline_segs[i]);
- }
- io_request.nseg = 0;
- }
-
- void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
- uint64_t copied = 0;
- uint64_t left = len;
- void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
- uint16_t i = 0;
- while (left > 0) {
- char *src = static_cast<char*>(segs[i++]);
- uint64_t need_copy = std::min(left, data_buffer_size-off);
- memcpy(buf+copied, src+off, need_copy);
- off = 0;
- left -= need_copy;
- copied += need_copy;
- }
- }
-
- void io_wait() {
- std::unique_lock<std::mutex> l(lock);
- cond.wait(l);
- }
-
- void io_wake() {
- std::lock_guard<std::mutex> l(lock);
- cond.notify_all();
- }
-};
-
-class SharedDriverData {
- unsigned id;
- uint32_t core_id;
- std::string sn;
+class SharedDriverQueueData {
+ SharedDriverData *driver;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
+ std::string sn;
+ uint64_t block_size;
+ uint32_t sector_size;
+ uint32_t core_id;
+ uint32_t queueid;
struct spdk_nvme_qpair *qpair;
std::function<void ()> run_func;
-
- uint64_t block_size = 0;
- uint32_t sector_size = 0;
- uint64_t size = 0;
- std::vector<NVMEDevice*> registered_devices;
friend class AioCompletionThread;
bool aio_stop = false;
void _aio_thread();
- void _aio_start() {
- int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
- core_id);
- assert(r == 0);
- }
- void _aio_stop() {
- {
- Mutex::Locker l(queue_lock);
- aio_stop = true;
- queue_cond.Signal();
- }
- int r = rte_eal_wait_lcore(core_id);
- assert(r == 0);
- aio_stop = false;
- }
+ int alloc_buf_from_pool(Task *t, bool write);
+
std::atomic_bool queue_empty;
Mutex queue_lock;
Cond queue_cond;
std::atomic_int flush_waiters;
std::set<uint64_t> flush_waiter_seqs;
- public:
- std::atomic_ulong completed_op_seq, queue_op_seq;
- PerfCounters *logger = nullptr;
+ public:
+ std::atomic_ulong completed_op_seq, queue_op_seq;
+ std::vector<void*> data_buf_mempool;
+ PerfCounters *logger = nullptr;
- SharedDriverData(unsigned i, uint32_t core, const std::string &sn_tag,
- spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
- : id(i),
- core_id(core),
- sn(sn_tag),
+ SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
+ const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
+ : driver(driver),
ctrlr(c),
ns(ns),
+ sn(sn_tag),
+ block_size(block_size),
+ sector_size(sector_size),
+ core_id(core),
+ queueid(queue_id),
run_func([this]() { _aio_thread(); }),
queue_empty(false),
queue_lock("NVMEDevice::queue_lock"),
flush_waiters(0),
completed_op_seq(0), queue_op_seq(0) {
- sector_size = spdk_nvme_ns_get_sector_size(ns);
- block_size = std::max(CEPH_PAGE_SIZE, sector_size);
- size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
- qpair = spdk_nvme_ctrlr_alloc_io_qpair(c, SPDK_NVME_QPRIO_URGENT);
-
+ qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
logger = b.create_perf_counters();
g_ceph_context->get_perfcounters_collection()->add(logger);
- _aio_start();
+ }
+
+ void queue_task(Task *t, uint64_t ops = 1) {
+ queue_op_seq += ops;
+ Mutex::Locker l(queue_lock);
+ task_queue.push(t);
+ if (queue_empty.load()) {
+ queue_empty = false;
+ queue_cond.Signal();
+ }
}
- ~SharedDriverData() {
+
+ void flush_wait() {
+ uint64_t cur_seq = queue_op_seq.load();
+ uint64_t left = cur_seq - completed_op_seq.load();
+ if (cur_seq > completed_op_seq) {
+ // TODO: this may contains read op
+ dout(10) << __func__ << " existed inflight ops " << left << dendl;
+ Mutex::Locker l(flush_lock);
+ ++flush_waiters;
+ flush_waiter_seqs.insert(cur_seq);
+ while (cur_seq > completed_op_seq.load()) {
+ flush_cond.Wait(flush_lock);
+ }
+ flush_waiter_seqs.erase(cur_seq);
+ --flush_waiters;
+ }
+ }
+
+ void start() {
+ int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
+ core_id);
+ assert(r == 0);
+
+ }
+
+ void stop() {
+ {
+ Mutex::Locker l(queue_lock);
+ aio_stop = true;
+ queue_cond.Signal();
+ }
+ int r = rte_eal_wait_lcore(core_id);
+ assert(r == 0);
+ aio_stop = false;
+ }
+
+ ~SharedDriverQueueData() {
g_ceph_context->get_perfcounters_collection()->remove(logger);
if (!qpair) {
spdk_nvme_ctrlr_free_io_qpair(qpair);
}
delete logger;
}
+};
+
+class SharedDriverData {
+ unsigned id;
+ uint32_t core_id;
+
+ std::string sn;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ uint64_t block_size = 0;
+ uint32_t sector_size = 0;
+ uint64_t size = 0;
+ uint32_t queue_number;
+ std::vector<SharedDriverQueueData*> queues;
+
+ void _aio_start() {
+ for (auto &&it : queues)
+ it->start();
+ }
+ void _aio_stop() {
+ for (auto &&it : queues)
+ it->stop();
+ }
+
+ public:
+ std::vector<NVMEDevice*> registered_devices;
+ SharedDriverData(unsigned _id, const std::string &sn_tag,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+ : id(_id),
+ sn(sn_tag),
+ ctrlr(c),
+ ns(ns) {
+ int i;
+ sector_size = spdk_nvme_ns_get_sector_size(ns);
+ block_size = std::max(CEPH_PAGE_SIZE, sector_size);
+ size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
+
+ RTE_LCORE_FOREACH_SLAVE(i) {
+ queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
+ }
+
+ _aio_start();
+ }
bool is_equal(const string &tag) const { return sn == tag; }
+ ~SharedDriverData() {
+ for (auto p : queues) {
+ delete p;
+ }
+ }
+
+ SharedDriverQueueData *get_queue(uint32_t i) {
+ return queues.at(i%queue_number);
+ }
+
void register_device(NVMEDevice *device) {
// in case of registered_devices, we stop thread now.
// Because release is really a rare case, we could bear this
registered_devices.push_back(device);
_aio_start();
}
+
void remove_device(NVMEDevice *device) {
_aio_stop();
std::vector<NVMEDevice*> new_devices;
uint64_t get_size() {
return size;
}
- void queue_task(Task *t, uint64_t ops = 1) {
- queue_op_seq += ops;
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.load()) {
- queue_empty = false;
- queue_cond.Signal();
+};
+
+struct Task {
+ NVMEDevice *device;
+ IOContext *ctx = nullptr;
+ IOCommand command;
+ uint64_t offset;
+ uint64_t len;
+ bufferlist write_bl;
+ std::function<void()> fill_cb;
+ Task *next = nullptr;
+ int64_t return_code;
+ ceph::coarse_real_clock::time_point start;
+ IORequest io_request;
+ std::mutex lock;
+ std::condition_variable cond;
+ SharedDriverQueueData *queue;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
+ : device(dev), command(c), offset(off), len(l),
+ return_code(rc),
+ start(ceph::coarse_real_clock::now()) {}
+ ~Task() {
+ assert(!io_request.nseg);
+ }
+ void release_segs(SharedDriverQueueData *queue_data) {
+ if (io_request.extra_segs) {
+ for (uint16_t i = 0; i < io_request.nseg; i++)
+ queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
+ delete io_request.extra_segs;
+ } else if (io_request.nseg) {
+ for (uint16_t i = 0; i < io_request.nseg; i++)
+ queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
}
+ io_request.nseg = 0;
}
- void flush_wait() {
- uint64_t cur_seq = queue_op_seq.load();
- uint64_t left = cur_seq - completed_op_seq.load();
- if (cur_seq > completed_op_seq) {
- // TODO: this may contains read op
- dout(10) << __func__ << " existed inflight ops " << left << dendl;
- Mutex::Locker l(flush_lock);
- ++flush_waiters;
- flush_waiter_seqs.insert(cur_seq);
- while (cur_seq > completed_op_seq.load()) {
- flush_cond.Wait(flush_lock);
- }
- flush_waiter_seqs.erase(cur_seq);
- --flush_waiters;
+ void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
+ uint64_t copied = 0;
+ uint64_t left = len;
+ void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
+ uint16_t i = 0;
+ while (left > 0) {
+ char *src = static_cast<char*>(segs[i++]);
+ uint64_t need_copy = std::min(left, data_buffer_size-off);
+ memcpy(buf+copied, src+off, need_copy);
+ off = 0;
+ left -= need_copy;
+ copied += need_copy;
}
}
+
+ void io_wait() {
+ std::unique_lock<std::mutex> l(lock);
+ cond.wait(l);
+ }
+
+ void io_wake() {
+ std::lock_guard<std::mutex> l(lock);
+ cond.notify_all();
+ }
};
static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
return 0;
}
-static int alloc_buf_from_pool(Task *t, bool write)
+int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
{
uint64_t count = t->len / data_buffer_size;
if (t->len % data_buffer_size)
return 0;
}
-void SharedDriverData::_aio_thread()
+void SharedDriverQueueData::_aio_thread()
{
dout(1) << __func__ << " start" << dendl;
Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
+
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
while (true) {
}
for (; t; t = t->next) {
+ t->queue = this;
lba_off = t->offset / sector_size;
lba_count = t->len / sector_size;
switch (t->command) {
if (r < 0) {
derr << __func__ << " failed to do write command" << dendl;
t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
- t->release_segs();
+ t->release_segs(this);
delete t;
ceph_abort();
}
data_buf_reset_sgl, data_buf_next_sge);
if (r < 0) {
derr << __func__ << " failed to read" << dendl;
- t->release_segs();
+ t->release_segs(this);
delete t;
ceph_abort();
} else {
r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
if (r < 0) {
derr << __func__ << " failed to flush" << dendl;
- t->release_segs();
+ t->release_segs(this);
delete t;
ceph_abort();
} else {
flush_cond.Signal();
}
-
if (!inflight) {
- for (auto &&it : registered_devices)
- it->reap_ioc();
+ // be careful, here we need to let each thread reap its own, currently it is done
+ // by only one dedicatd dpdk thread
+ if(!queueid) {
+ for (auto &&it : driver->registered_devices)
+ it->reap_ioc();
+ }
Mutex::Locker l(queue_lock);
if (queue_empty.load()) {
// only support one device per osd now!
assert(shared_driver_datas.empty());
// index 0 is occured by master thread
- shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, rte_get_next_lcore(-1, 1, 0), sn_tag, c, ns));
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
*driver = shared_driver_datas.back();
}
};
{
Task *task = static_cast<Task*>(t);
IOContext *ctx = task->ctx;
- SharedDriverData *driver = task->device->get_driver();
+ SharedDriverQueueData *queue = task->queue;
+ assert(queue != NULL);
assert(ctx != NULL);
- ++driver->completed_op_seq;
+ ++queue->completed_op_seq;
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - task->start);
if (task->command == IOCommand::WRITE_COMMAND) {
- driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
+ queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " write/zero op successfully, left "
- << driver->queue_op_seq - driver->completed_op_seq << dendl;
+ << queue->queue_op_seq - queue->completed_op_seq << dendl;
// check waiting count before doing callback (which may
// destroy this ioc).
if (!--ctx->num_running) {
task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
}
}
- task->release_segs();
+ task->release_segs(queue);
delete task;
} else if (task->command == IOCommand::READ_COMMAND) {
- driver->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
+ queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " read op successfully" << dendl;
task->fill_cb();
- task->release_segs();
+ task->release_segs(queue);
// read submitted by AIO
if(!task->return_code) {
if (!--ctx->num_running) {
} else {
assert(task->command == IOCommand::FLUSH_COMMAND);
assert(!spdk_nvme_cpl_is_error(completion));
- driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
+ queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
dout(20) << __func__ << " flush op successfully" << dendl;
task->return_code = 0;
ctx->aio_wake();
{
dout(10) << __func__ << " start" << dendl;
auto start = ceph::coarse_real_clock::now();
- driver->flush_wait();
+
+ if(queue_id == -1)
+ queue_id = ceph_gettid();
+ SharedDriverQueueData *queue = driver->get_queue(queue_id);
+ assert(queue != NULL);
+ queue->flush_wait();
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - start);
- driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
+ queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
return 0;
}
ioc->num_pending -= pending;
assert(ioc->num_pending.load() == 0); // we should be only thread doing this
// Only need to push the first entry
- driver->queue_task(t, pending);
+ if(queue_id == -1)
+ queue_id = ceph_gettid();
+ driver->get_queue(queue_id)->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
if (buffered) {
// Only need to push the first entry
- driver->queue_task(t);
+ if(queue_id == -1)
+ queue_id = ceph_gettid();
+ driver->get_queue(queue_id)->queue_task(t);
} else {
t->ctx = ioc;
Task *first = static_cast<Task*>(ioc->nvme_task_first);
t->copy_to_buf(buf, 0, t->len);
};
++ioc->num_reading;
- driver->queue_task(t);
+ if(queue_id == -1)
+ queue_id = ceph_gettid();
+ driver->get_queue(queue_id)->queue_task(t);
while(t->return_code > 0) {
t->io_wait();
return 0;
}
-
-
int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
{
assert(len > 0);
t->copy_to_buf(buf, off-t->offset, len);
};
++ioc.num_reading;
- driver->queue_task(t);
+ if(queue_id == -1)
+ queue_id = ceph_gettid();
+ driver->get_queue(queue_id)->queue_task(t);
while(t->return_code > 0) {
t->io_wait();