#undef dout_prefix
#define dout_prefix *_dout << "bdev "
-void IOContext::aio_wait()
-{
- Mutex::Locker l(lock);
- // see _aio_thread for waker logic
- num_waiting.inc();
- while (num_running.read() > 0 || num_reading.read() > 0) {
- dout(10) << __func__ << " " << this
- << " waiting for " << num_running.read() << " aios and/or "
- << num_reading.read() << " readers to complete" << dendl;
- cond.Wait(lock);
- }
- num_waiting.dec();
- dout(20) << __func__ << " " << this << " done" << dendl;
-}
-
-static void io_complete(void *ctx, const struct nvme_completion *completion) {
- if (nvme_completion_is_error(completion)) {
- assert(0);
- }
-
- IOContext *ioc = (IOContext*)ctx;
- NVMEDevice *device = (NVMEDevice*)ioc->backend;
- ioc->done = true;
- if (ioc->priv) {
- device->aio_callback(device->aio_callback_priv, ioc->priv);
+struct rte_mempool *request_mempool;
+static struct rte_mempool *task_pool;
+
+static void io_complete(void *t, const struct nvme_completion *completion) {
+ Task *task = static_cast<Task*>(t);
+ IOContext *ctx = task->ctx;
+ if (task->command == IOCommand::WRITE_COMMAND) {
+ auto left = ctx->num_running.dec();
+ assert(!nvme_completion_is_error(completion));
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (!left) {
+ ctx->backend_priv = nullptr;
+ if (ctx->num_waiting.read()) {
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
+ }
+ if (ctx->priv) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ }
+ rte_free(task->buf);
+ rte_mempool_put(task_pool, task);
+ } else {
+ assert(task->command == IOCommand::READ_COMMAND);
+ ctx->num_reading.dec();
+ if (nvme_completion_is_error(completion))
+ task->read_code = -1; // FIXME
+ else
+ task->read_code = 0;
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
}
}
NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
: ctrlr(nullptr),
ns(nullptr),
+ aio_stop(false),
+ queue_lock("NVMEDevice::queue_lock"),
+ aio_thread(this),
aio_callback(cb),
aio_callback_priv(cbpriv)
{
}
static char *ealargs[] = {
- "perf",
+ "ceph-osd",
"-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */
"-n 4",
};
nvme_request_size(), 128, 0,
NULL, NULL, NULL, NULL,
SOCKET_ID_ANY, 0);
+ if (request_mempool == NULL)
+ assert(0);
+
+ task_pool = rte_mempool_create(
+ "task_pool", 8192, sizeof(Task),
+ 64, 0, NULL, NULL, NULL, NULL,
+ SOCKET_ID_ANY, 0);
if (request_mempool == NULL)
assert(0);
}
pci_iterator_destroy(iter);
+ aio_thread.create();
+
dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
<< " block_size " << block_size << " (" << pretty_si_t(block_size)
<< "B)" << dendl;
void NVMEDevice::close()
{
dout(1) << __func__ << dendl;
- nvme_unregister_io_thread();
+
+ aio_stop = true;
+ aio_thread.join();
+ aio_stop = false;
name.clear();
}
+void NVMEDevice::_aio_thread()
+{
+ dout(10) << __func__ << " start" << dendl;
+ if (nvme_register_io_thread() != 0) {
+ assert(0);
+ }
+
+ Task *t;
+ int r = 0;
+ const int max = 16;
+ while (!aio_stop) {
+ dout(40) << __func__ << " polling" << dendl;
+ {
+ Mutex::Locker l(queue_lock);
+ if (!task_queue.empty()) {
+ t = task_queue.front();
+ task_queue.pop();
+ }
+ }
+
+ if (t) {
+ switch (t->command) {
+ case IOCommand::WRITE_COMMAND:
+ {
+ while (t) {
+ r = nvme_ns_cmd_write(ns, t->buf, t->offset, t->len / block_size, io_complete, t);
+ if (r < 0) {
+ t->ctx->backend_priv = nullptr;
+ rte_free(t->buf);
+ rte_mempool_put(task_pool, t);
+ derr << __func__ << " failed to do write command" << dendl;
+ assert(0);
+ }
+ t = t->next;
+ }
+ break;
+ }
+ case IOCommand::READ_COMMAND:
+ {
+ r = nvme_ns_cmd_read(ns, t->buf, t->offset, t->len / block_size, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to read" << dendl;
+ t->ctx->num_reading.dec();
+ t->read_code = r;
+ Mutex::Locker l(t->ctx->lock);
+ t->ctx->cond.Signal();
+ }
+ break;
+ }
+ }
+ }
+
+ nvme_ctrlr_process_io_completions(ctrlr, max);
+ }
+ nvme_unregister_io_thread();
+ dout(10) << __func__ << " end" << dendl;
+}
+
int NVMEDevice::flush()
{
dout(10) << __func__ << " start" << dendl;
void NVMEDevice::aio_submit(IOContext *ioc)
{
- while (!ioc->done) {
- nvme_ctrlr_process_io_completions(ctrlr, 0);
- usleep(50);
- }
+ dout(20) << __func__ << " ioc " << ioc << " pending "
+ << ioc->num_pending.read() << " running "
+ << ioc->num_running.read() << dendl;
+ Task *t = static_cast<Task*>(ioc->backend_priv);
+ int pending = ioc->num_pending.read();
+ ioc->num_running.add(pending);
+ ioc->num_pending.sub(pending);
+ assert(ioc->num_pending.read() == 0); // we should be only thread doing this
+ Mutex::Locker l(queue_lock);
+ // Only need to push the first entry
+ task_queue.push(t);
}
int NVMEDevice::aio_write(
assert(off < size);
assert(off + len <= size);
- if (!bl.is_n_page_sized() || !bl.is_page_aligned()) {
- dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl;
- bl.rebuild();
- }
-
- ioc->backend = this;
- int rc = nvme_ns_cmd_write(ns, bl.c_str(), off,
- bl.length()/block_size, io_complete, ioc);
- if (rc < 0) {
- derr << __func__ << " failed to do write command" << dendl;
- return rc;
+ Task *t;
+ int r = rte_mempool_get(task_pool, (void **)&t);
+ if (r < 0) {
+ derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
+ return r;
+ }
+
+ t->buf = rte_malloc(NULL, bl.length(), block_size);
+ if (t->buf == NULL) {
+ derr << __func__ << " task->buf rte_malloc failed" << dendl;
+ rte_mempool_put(task_pool, t);
+ return -ENOMEM;
+ }
+ bl.copy(0, bl.length(), static_cast<char*>(t->buf));
+
+ t->ctx = ioc;
+ t->command = IOCommand::WRITE_COMMAND;
+ t->offset = off;
+ t->len = len;
+ t->device = this;
+ if (ioc->backend_priv) {
+ Task *prev = static_cast<Task*>(ioc->backend_priv);
+ prev->next = t;
+ } else {
+ ioc->backend_priv = t;
}
+ t->next = nullptr;
+ ioc->num_pending.inc();
dout(5) << __func__ << " " << off << "~" << len << dendl;
assert(off < size);
assert(off + len <= size);
- bufferptr p = buffer::create_page_aligned(len);
- ioc->backend = this;
- int r = nvme_ns_cmd_read(ns, p.c_str(), off, len / block_size, io_complete, ioc);
+ Task *t;
+ int r = rte_mempool_get(task_pool, (void **)&t);
if (r < 0) {
- r = -errno;
- derr << __func__ << " failed to read" << dendl;
+ derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
return r;
}
- while (!ioc->done) {
- nvme_ctrlr_process_io_completions(ctrlr, 0);
- usleep(50);
+
+ bufferptr p = buffer::create_page_aligned(len);
+ t->buf = rte_malloc(NULL, len, block_size);
+ if (t->buf == NULL) {
+ derr << __func__ << " task->buf rte_malloc failed" << dendl;
+ r = -ENOMEM;
+ goto out;
+ }
+ t->ctx = ioc;
+ t->command = IOCommand::READ_COMMAND;
+ t->offset = off;
+ t->len = len;
+ t->device = this;
+ t->read_code = 1;
+ assert(!ioc->backend_priv);
+ ioc->num_reading.inc();;
+ {
+ Mutex::Locker l(queue_lock);
+ task_queue.push(t);
+ }
+
+ {
+ Mutex::Locker l(ioc->lock);
+ while (t->read_code > 0)
+ ioc->cond.Wait(ioc->lock);
}
+ memcpy(p.c_str(), t->buf, len);
pbl->clear();
pbl->push_back(p);
-
- return r < 0 ? r : 0;
+ r = t->read_code;
+ rte_free(t);
+
+ out:
+ rte_mempool_put(task_pool, t);
+ if (ioc->num_waiting.read()) {
+ dout(20) << __func__ << " waking waiter" << dendl;
+ Mutex::Locker l(ioc->lock);
+ ioc->cond.Signal();
+ }
+ return r;
}
int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)