From: Haomai Wang Date: Tue, 19 Jan 2016 10:26:58 +0000 (+0800) Subject: NVMEDevice: add buffer write support X-Git-Tag: v10.0.4~81^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=81defd1643a3c56a7deb9c05028c0136b2386ea6;p=ceph.git NVMEDevice: add buffer write support Signed-off-by: Haomai Wang --- diff --git a/src/os/bluestore/NVMEDevice.cc b/src/os/bluestore/NVMEDevice.cc index 3de8f3b595c..ad96d34c3b9 100644 --- a/src/os/bluestore/NVMEDevice.cc +++ b/src/os/bluestore/NVMEDevice.cc @@ -70,22 +70,26 @@ static void io_complete(void *t, const struct nvme_completion *completion) { lat -= task->start; if (task->command == IOCommand::WRITE_COMMAND) { task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat); - auto left = ctx->num_running.dec(); assert(!nvme_completion_is_error(completion)); - // check waiting count before doing callback (which may - // destroy this ioc). dout(20) << __func__ << " write op successfully, left " << left << dendl; - if (!left) { - if (ctx->num_waiting.read()) { - Mutex::Locker l(ctx->lock); - ctx->cond.Signal(); - } - if (task->device->aio_callback && ctx->priv) { - task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + // buffer write won't have ctx, and we will free request later, see `flush` + if (ctx) { + // check waiting count before doing callback (which may + // destroy this ioc). + if (!ctx->num_running.dec()) { + if (ctx->num_waiting.read()) { + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); + } + if (task->device->aio_callback && ctx->priv) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } } + rte_free(task->buf); + rte_mempool_put(task_pool, task); + } else { + task->device->queue_buffer_task(task); } - rte_free(task->buf); - rte_mempool_put(task_pool, task); } else if (task->command == IOCommand::READ_COMMAND) { task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat); ctx->num_reading.dec(); @@ -312,6 +316,7 @@ NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv) aio_thread(this), flush_lock("NVMEDevice::flush_lock"), flush_waiters(0), + buffer_lock("NVMEDevice::buffer_lock"), logger(nullptr), inflight_ops(0), aio_callback(cb), @@ -416,7 +421,7 @@ void NVMEDevice::_aio_thread() Task *t; int r = 0; - const int max = 16; + const int max = 4; uint64_t lba_off, lba_count; utime_t lat, start = ceph_clock_now(g_ceph_context); while (true) { @@ -443,7 +448,9 @@ void NVMEDevice::_aio_thread() logger->tinc(l_bluestore_nvmedevice_polling_lat, lat); if (aio_stop) break; + dout(20) << __func__ << " enter sleep" << dendl; queue_cond.Wait(queue_lock); + dout(20) << __func__ << " exit sleep" << dendl; start = ceph_clock_now(g_ceph_context); } } @@ -467,7 +474,6 @@ void NVMEDevice::_aio_thread() lat = ceph_clock_now(g_ceph_context); lat -= t->start; logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat); - inflight_ops.inc(); t = t->next; } break; @@ -485,7 +491,6 @@ void NVMEDevice::_aio_thread() Mutex::Locker l(t->ctx->lock); t->ctx->cond.Signal(); } else { - inflight_ops.inc(); lat = ceph_clock_now(g_ceph_context); lat -= t->start; logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat); @@ -502,7 +507,6 @@ void NVMEDevice::_aio_thread() Mutex::Locker l(t->ctx->lock); t->ctx->cond.Signal(); } else { - inflight_ops.inc(); lat = ceph_clock_now(g_ceph_context); lat -= t->start; logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat); @@ -511,16 +515,14 @@ void NVMEDevice::_aio_thread() } } } else if (inflight_ops.read()) { - dout(20) << __func__ << " idle, have a pause" << dendl; - + nvme_ctrlr_process_io_completions(ctrlr, max); + dout(30) << __func__ << " idle, have a pause" << dendl; #ifdef HAVE_SSE _mm_pause(); #else usleep(10); #endif } - - nvme_ctrlr_process_io_completions(ctrlr, max); reap_ioc(); } nvme_unregister_io_thread(); @@ -541,6 +543,18 @@ int NVMEDevice::flush() } flush_waiters.dec(); } + Task *t = nullptr; + { + Mutex::Locker l(buffer_lock); + buffered_extents.clear(); + t = buffered_task_head; + buffered_task_head = nullptr; + } + while (t) { + rte_free(t->buf); + rte_mempool_put(task_pool, t); + t = t->next; + } utime_t lat = ceph_clock_now(g_ceph_context); lat -= start; logger->tinc(l_bluestore_nvmedevice_flush_lat, lat); @@ -595,13 +609,8 @@ void NVMEDevice::aio_submit(IOContext *ioc) ioc->num_running.add(pending); ioc->num_pending.sub(pending); assert(ioc->num_pending.read() == 0); // we should be only thread doing this - Mutex::Locker l(queue_lock); // Only need to push the first entry - task_queue.push(t); - if (queue_empty.read()) { - queue_empty.dec(); - queue_cond.Signal(); - } + queue_task(t, pending); ioc->nvme_task_first = ioc->nvme_task_last = nullptr; } } @@ -613,7 +622,8 @@ int NVMEDevice::aio_write( bool buffered) { uint64_t len = bl.length(); - dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; + dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc + << " buffered " << buffered << dendl; assert(off % block_size == 0); assert(len % block_size == 0); assert(len > 0); @@ -623,7 +633,6 @@ int NVMEDevice::aio_write( Task *t; int r = rte_mempool_get(task_pool, (void **)&t); if (r < 0) { - derr << __func__ << " task_pool rte_mempool_get failed" << dendl; return r; } t->start = ceph_clock_now(g_ceph_context); @@ -636,24 +645,30 @@ int NVMEDevice::aio_write( } bl.copy(0, len, static_cast(t->buf)); - t->ctx = ioc; t->command = IOCommand::WRITE_COMMAND; t->offset = off; t->len = len; t->device = this; t->return_code = 0; + t->next = nullptr; if (buffered) { + t->ctx = nullptr; + // Only need to push the first entry + queue_task(t); + Mutex::Locker l(buffer_lock); + buffered_extents.insert(off, len, (char*)t->buf); + } else { + t->ctx = ioc; + Task *first = static_cast(ioc->nvme_task_first); + Task *last = static_cast(ioc->nvme_task_last); + if (last) + last->next = t; + if (!first) + ioc->nvme_task_first = t; + ioc->nvme_task_last = t; + ioc->num_pending.inc(); } - Task *first = static_cast(ioc->nvme_task_first); - Task *last = static_cast(ioc->nvme_task_last); - if (last) - last->next = t; - t->next = nullptr; - if (!first) - ioc->nvme_task_first = t; - ioc->nvme_task_last = t; - ioc->num_pending.inc(); dout(5) << __func__ << " " << off << "~" << len << dendl; @@ -719,14 +734,7 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, t->return_code = 1; t->next = nullptr; ioc->num_reading.inc();; - { - Mutex::Locker l(queue_lock); - task_queue.push(t); - if (queue_empty.read()) { - queue_empty.dec(); - queue_cond.Signal(); - } - } + queue_task(t); { Mutex::Locker l(ioc->lock); @@ -734,6 +742,11 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, ioc->cond.Wait(ioc->lock); } memcpy(p.c_str(), t->buf, len); + { + Mutex::Locker l(buffer_lock); + uint64_t copied = buffered_extents.read_overlap(off, len, (char*)t->buf); + dout(10) << __func__ << " read from buffer " << copied << dendl; + } pbl->clear(); pbl->push_back(p); r = t->return_code; @@ -782,14 +795,7 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf) t->return_code = 1; t->next = nullptr; ioc.num_reading.inc();; - { - Mutex::Locker l(queue_lock); - task_queue.push(t); - if (queue_empty.read()) { - queue_empty.dec(); - queue_cond.Signal(); - } - } + queue_task(t); { Mutex::Locker l(ioc.lock); @@ -797,6 +803,11 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf) ioc.cond.Wait(ioc.lock); } memcpy(buf, (char*)t->buf+off-aligned_off, len); + { + Mutex::Locker l(buffer_lock); + uint64_t copied = buffered_extents.read_overlap(off, len, buf); + dout(10) << __func__ << " read from buffer " << copied << dendl; + } r = t->return_code; rte_free(t->buf); rte_mempool_put(task_pool, t); diff --git a/src/os/bluestore/NVMEDevice.h b/src/os/bluestore/NVMEDevice.h index 9ef70534706..46637c55bcb 100644 --- a/src/os/bluestore/NVMEDevice.h +++ b/src/os/bluestore/NVMEDevice.h @@ -18,7 +18,9 @@ #define CEPH_OS_BLUESTORE_NVMEDEVICE #include +#include #include +#include // since _Static_assert introduced in c11 #define _Static_assert static_assert @@ -35,6 +37,7 @@ extern "C" { #endif #include "include/atomic.h" +#include "include/interval_set.h" #include "include/utime.h" #include "common/Mutex.h" #include "BlockDevice.h" @@ -81,6 +84,16 @@ class NVMEDevice : public BlockDevice { Cond queue_cond; std::queue task_queue; + void queue_task(Task *t, uint64_t ops = 1) { + inflight_ops.add(ops); + Mutex::Locker l(queue_lock); + task_queue.push(t); + if (queue_empty.read()) { + queue_empty.dec(); + queue_cond.Signal(); + } + } + struct AioCompletionThread : public Thread { NVMEDevice *dev; AioCompletionThread(NVMEDevice *b) : dev(b) {} @@ -96,7 +109,156 @@ class NVMEDevice : public BlockDevice { Cond flush_cond; atomic_t flush_waiters; + struct BufferedExtents { + struct Extent { + uint64_t x_len; + uint64_t x_off; + const char *data; + uint64_t data_len; + }; + using Offset = uint64_t; + map buffered_extents; + uint64_t left_edge = std::numeric_limits::max(); + uint64_t right_edge = 0; + + void verify() { + interval_set m; + for (auto && it : buffered_extents) { + assert(!m.intersects(it.first, it.second.x_len)); + m.insert(it.first, it.second.x_len); + } + } + + void insert(uint64_t off, uint64_t len, const char *data) { + auto it = buffered_extents.lower_bound(off); + if (it != buffered_extents.begin()) { + --it; + if (it->first + it->second.x_len <= off) + ++it; + } + uint64_t end = off + len; + if (off < left_edge) + left_edge = off; + if (end > right_edge) + right_edge = end; + while (it != buffered_extents.end()) { + if (it->first >= end) + break; + uint64_t extent_it_end = it->first + it->second.x_len; + assert(extent_it_end >= off); + if (it->first <= off) { + if (extent_it_end > end) { + // <- data -> + // <- it -> + it->second.x_len -= (extent_it_end - off); + buffered_extents[end] = Extent{ + extent_it_end - end, it->second.x_off + it->second.x_len + len, it->second.data, it->second.data_len}; + } else { + // <- data -> + // <- it -> + assert(extent_it_end <= end); + it->second.x_len -= (extent_it_end - off); + } + ++it; + } else { + assert(it->first > off) ; + if (extent_it_end > end) { + // <- data -> + // <- it -> + uint64_t overlap = end - it->first; + buffered_extents[end] = Extent{ + it->second.x_len - overlap, it->second.x_off + overlap, it->second.data, it->second.data_len}; + } else { + // <- data -> + // <- it -> + } + buffered_extents.erase(it++); + } + } + buffered_extents[off] = Extent{ + len, 0, data, len}; + + if (0) + verify(); + } + + void memcpy_check(char *dst, uint64_t dst_raw_len, uint64_t dst_off, + map::iterator &it, uint64_t src_off, uint64_t copylen) { + if (0) { + assert(dst_off + copylen <= dst_raw_len); + assert(it->second.x_off + src_off + copylen <= it->second.data_len); + } + memcpy(dst + dst_off, it->second.data + it->second.x_off + src_off, copylen); + } + + uint64_t read_overlap(uint64_t off, uint64_t len, char *buf) { + uint64_t end = off + len; + if (end <= left_edge || off >= right_edge) + return 0; + + uint64_t copied = 0; + auto it = buffered_extents.lower_bound(off); + if (it != buffered_extents.begin()) { + --it; + if (it->first + it->second.x_len <= off) + ++it; + } + uint64_t copy_len; + while (it != buffered_extents.end()) { + if (it->first >= end) + break; + uint64_t extent_it_end = it->first + it->second.x_len; + assert(extent_it_end >= off); + if (it->first >= off) { + if (extent_it_end > end) { + // <- data -> + // <- it -> + copy_len = len - (it->first - off); + memcpy_check(buf, len, it->first - off, it, 0, copy_len); + } else { + // <- data -> + // <- it -> + copy_len = it->second.x_len; + memcpy_check(buf, len, it->first - off, it, 0, copy_len); + } + } else { + if (extent_it_end > end) { + // <- data -> + // <- it -> + copy_len = len; + memcpy_check(buf, len, 0, it, off - it->first, copy_len); + } else { + // <- data -> + // <- it -> + assert(extent_it_end <= end); + copy_len = it->first + it->second.x_len - off; + memcpy_check(buf, len, 0, it, off - it->first, copy_len); + } + } + copied += copy_len; + ++it; + } + return copied; + } + + void clear() { + buffered_extents.clear(); + left_edge = std::numeric_limits::max(); + right_edge = 0; + } + }; + Mutex buffer_lock; + BufferedExtents buffered_extents; + Task *buffered_task_head = nullptr; + static void init(); + public: + void queue_buffer_task(Task *t) { + Mutex::Locker l(buffer_lock); + assert(t->next == nullptr); + t->next = buffered_task_head; + buffered_task_head = t; + } public: PerfCounters *logger;