lat -= task->start;
if (task->command == IOCommand::WRITE_COMMAND) {
task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
- auto left = ctx->num_running.dec();
assert(!nvme_completion_is_error(completion));
- // check waiting count before doing callback (which may
- // destroy this ioc).
dout(20) << __func__ << " write op successfully, left " << left << dendl;
- if (!left) {
- if (ctx->num_waiting.read()) {
- Mutex::Locker l(ctx->lock);
- ctx->cond.Signal();
- }
- if (task->device->aio_callback && ctx->priv) {
- task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ // buffer write won't have ctx, and we will free request later, see `flush`
+ if (ctx) {
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (!ctx->num_running.dec()) {
+ if (ctx->num_waiting.read()) {
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
+ }
+ if (task->device->aio_callback && ctx->priv) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
}
+ rte_free(task->buf);
+ rte_mempool_put(task_pool, task);
+ } else {
+ task->device->queue_buffer_task(task);
}
- rte_free(task->buf);
- rte_mempool_put(task_pool, task);
} else if (task->command == IOCommand::READ_COMMAND) {
task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
ctx->num_reading.dec();
aio_thread(this),
flush_lock("NVMEDevice::flush_lock"),
flush_waiters(0),
+ buffer_lock("NVMEDevice::buffer_lock"),
logger(nullptr),
inflight_ops(0),
aio_callback(cb),
Task *t;
int r = 0;
- const int max = 16;
+ const int max = 4;
uint64_t lba_off, lba_count;
utime_t lat, start = ceph_clock_now(g_ceph_context);
while (true) {
logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
if (aio_stop)
break;
+ dout(20) << __func__ << " enter sleep" << dendl;
queue_cond.Wait(queue_lock);
+ dout(20) << __func__ << " exit sleep" << dendl;
start = ceph_clock_now(g_ceph_context);
}
}
lat = ceph_clock_now(g_ceph_context);
lat -= t->start;
logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
- inflight_ops.inc();
t = t->next;
}
break;
Mutex::Locker l(t->ctx->lock);
t->ctx->cond.Signal();
} else {
- inflight_ops.inc();
lat = ceph_clock_now(g_ceph_context);
lat -= t->start;
logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
Mutex::Locker l(t->ctx->lock);
t->ctx->cond.Signal();
} else {
- inflight_ops.inc();
lat = ceph_clock_now(g_ceph_context);
lat -= t->start;
logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
}
}
} else if (inflight_ops.read()) {
- dout(20) << __func__ << " idle, have a pause" << dendl;
-
+ nvme_ctrlr_process_io_completions(ctrlr, max);
+ dout(30) << __func__ << " idle, have a pause" << dendl;
#ifdef HAVE_SSE
_mm_pause();
#else
usleep(10);
#endif
}
-
- nvme_ctrlr_process_io_completions(ctrlr, max);
reap_ioc();
}
nvme_unregister_io_thread();
}
flush_waiters.dec();
}
+ Task *t = nullptr;
+ {
+ Mutex::Locker l(buffer_lock);
+ buffered_extents.clear();
+ t = buffered_task_head;
+ buffered_task_head = nullptr;
+ }
+ while (t) {
+ rte_free(t->buf);
+ rte_mempool_put(task_pool, t);
+ t = t->next;
+ }
utime_t lat = ceph_clock_now(g_ceph_context);
lat -= start;
logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
ioc->num_running.add(pending);
ioc->num_pending.sub(pending);
assert(ioc->num_pending.read() == 0); // we should be only thread doing this
- Mutex::Locker l(queue_lock);
// Only need to push the first entry
- task_queue.push(t);
- if (queue_empty.read()) {
- queue_empty.dec();
- queue_cond.Signal();
- }
+ queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
bool buffered)
{
uint64_t len = bl.length();
- dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
+ << " buffered " << buffered << dendl;
assert(off % block_size == 0);
assert(len % block_size == 0);
assert(len > 0);
Task *t;
int r = rte_mempool_get(task_pool, (void **)&t);
if (r < 0) {
- derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
return r;
}
t->start = ceph_clock_now(g_ceph_context);
}
bl.copy(0, len, static_cast<char*>(t->buf));
- t->ctx = ioc;
t->command = IOCommand::WRITE_COMMAND;
t->offset = off;
t->len = len;
t->device = this;
t->return_code = 0;
+ t->next = nullptr;
if (buffered) {
+ t->ctx = nullptr;
+ // Only need to push the first entry
+ queue_task(t);
+ Mutex::Locker l(buffer_lock);
+ buffered_extents.insert(off, len, (char*)t->buf);
+ } else {
+ t->ctx = ioc;
+ Task *first = static_cast<Task*>(ioc->nvme_task_first);
+ Task *last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ioc->num_pending.inc();
}
- Task *first = static_cast<Task*>(ioc->nvme_task_first);
- Task *last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- t->next = nullptr;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ioc->num_pending.inc();
dout(5) << __func__ << " " << off << "~" << len << dendl;
t->return_code = 1;
t->next = nullptr;
ioc->num_reading.inc();;
- {
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.read()) {
- queue_empty.dec();
- queue_cond.Signal();
- }
- }
+ queue_task(t);
{
Mutex::Locker l(ioc->lock);
ioc->cond.Wait(ioc->lock);
}
memcpy(p.c_str(), t->buf, len);
+ {
+ Mutex::Locker l(buffer_lock);
+ uint64_t copied = buffered_extents.read_overlap(off, len, (char*)t->buf);
+ dout(10) << __func__ << " read from buffer " << copied << dendl;
+ }
pbl->clear();
pbl->push_back(p);
r = t->return_code;
t->return_code = 1;
t->next = nullptr;
ioc.num_reading.inc();;
- {
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.read()) {
- queue_empty.dec();
- queue_cond.Signal();
- }
- }
+ queue_task(t);
{
Mutex::Locker l(ioc.lock);
ioc.cond.Wait(ioc.lock);
}
memcpy(buf, (char*)t->buf+off-aligned_off, len);
+ {
+ Mutex::Locker l(buffer_lock);
+ uint64_t copied = buffered_extents.read_overlap(off, len, buf);
+ dout(10) << __func__ << " read from buffer " << copied << dendl;
+ }
r = t->return_code;
rte_free(t->buf);
rte_mempool_put(task_pool, t);
#define CEPH_OS_BLUESTORE_NVMEDEVICE
#include <queue>
+#include <map>
#include <pciaccess.h>
+#include <limits>
// since _Static_assert introduced in c11
#define _Static_assert static_assert
#endif
#include "include/atomic.h"
+#include "include/interval_set.h"
#include "include/utime.h"
#include "common/Mutex.h"
#include "BlockDevice.h"
Cond queue_cond;
std::queue<Task*> task_queue;
+ void queue_task(Task *t, uint64_t ops = 1) {
+ inflight_ops.add(ops);
+ Mutex::Locker l(queue_lock);
+ task_queue.push(t);
+ if (queue_empty.read()) {
+ queue_empty.dec();
+ queue_cond.Signal();
+ }
+ }
+
struct AioCompletionThread : public Thread {
NVMEDevice *dev;
AioCompletionThread(NVMEDevice *b) : dev(b) {}
Cond flush_cond;
atomic_t flush_waiters;
+ struct BufferedExtents {
+ struct Extent {
+ uint64_t x_len;
+ uint64_t x_off;
+ const char *data;
+ uint64_t data_len;
+ };
+ using Offset = uint64_t;
+ map<Offset, Extent> buffered_extents;
+ uint64_t left_edge = std::numeric_limits<uint64_t>::max();
+ uint64_t right_edge = 0;
+
+ void verify() {
+ interval_set<uint64_t> m;
+ for (auto && it : buffered_extents) {
+ assert(!m.intersects(it.first, it.second.x_len));
+ m.insert(it.first, it.second.x_len);
+ }
+ }
+
+ void insert(uint64_t off, uint64_t len, const char *data) {
+ auto it = buffered_extents.lower_bound(off);
+ if (it != buffered_extents.begin()) {
+ --it;
+ if (it->first + it->second.x_len <= off)
+ ++it;
+ }
+ uint64_t end = off + len;
+ if (off < left_edge)
+ left_edge = off;
+ if (end > right_edge)
+ right_edge = end;
+ while (it != buffered_extents.end()) {
+ if (it->first >= end)
+ break;
+ uint64_t extent_it_end = it->first + it->second.x_len;
+ assert(extent_it_end >= off);
+ if (it->first <= off) {
+ if (extent_it_end > end) {
+ // <- data ->
+ // <- it ->
+ it->second.x_len -= (extent_it_end - off);
+ buffered_extents[end] = Extent{
+ extent_it_end - end, it->second.x_off + it->second.x_len + len, it->second.data, it->second.data_len};
+ } else {
+ // <- data ->
+ // <- it ->
+ assert(extent_it_end <= end);
+ it->second.x_len -= (extent_it_end - off);
+ }
+ ++it;
+ } else {
+ assert(it->first > off) ;
+ if (extent_it_end > end) {
+ // <- data ->
+ // <- it ->
+ uint64_t overlap = end - it->first;
+ buffered_extents[end] = Extent{
+ it->second.x_len - overlap, it->second.x_off + overlap, it->second.data, it->second.data_len};
+ } else {
+ // <- data ->
+ // <- it ->
+ }
+ buffered_extents.erase(it++);
+ }
+ }
+ buffered_extents[off] = Extent{
+ len, 0, data, len};
+
+ if (0)
+ verify();
+ }
+
+ void memcpy_check(char *dst, uint64_t dst_raw_len, uint64_t dst_off,
+ map<Offset, Extent>::iterator &it, uint64_t src_off, uint64_t copylen) {
+ if (0) {
+ assert(dst_off + copylen <= dst_raw_len);
+ assert(it->second.x_off + src_off + copylen <= it->second.data_len);
+ }
+ memcpy(dst + dst_off, it->second.data + it->second.x_off + src_off, copylen);
+ }
+
+ uint64_t read_overlap(uint64_t off, uint64_t len, char *buf) {
+ uint64_t end = off + len;
+ if (end <= left_edge || off >= right_edge)
+ return 0;
+
+ uint64_t copied = 0;
+ auto it = buffered_extents.lower_bound(off);
+ if (it != buffered_extents.begin()) {
+ --it;
+ if (it->first + it->second.x_len <= off)
+ ++it;
+ }
+ uint64_t copy_len;
+ while (it != buffered_extents.end()) {
+ if (it->first >= end)
+ break;
+ uint64_t extent_it_end = it->first + it->second.x_len;
+ assert(extent_it_end >= off);
+ if (it->first >= off) {
+ if (extent_it_end > end) {
+ // <- data ->
+ // <- it ->
+ copy_len = len - (it->first - off);
+ memcpy_check(buf, len, it->first - off, it, 0, copy_len);
+ } else {
+ // <- data ->
+ // <- it ->
+ copy_len = it->second.x_len;
+ memcpy_check(buf, len, it->first - off, it, 0, copy_len);
+ }
+ } else {
+ if (extent_it_end > end) {
+ // <- data ->
+ // <- it ->
+ copy_len = len;
+ memcpy_check(buf, len, 0, it, off - it->first, copy_len);
+ } else {
+ // <- data ->
+ // <- it ->
+ assert(extent_it_end <= end);
+ copy_len = it->first + it->second.x_len - off;
+ memcpy_check(buf, len, 0, it, off - it->first, copy_len);
+ }
+ }
+ copied += copy_len;
+ ++it;
+ }
+ return copied;
+ }
+
+ void clear() {
+ buffered_extents.clear();
+ left_edge = std::numeric_limits<uint64_t>::max();
+ right_edge = 0;
+ }
+ };
+ Mutex buffer_lock;
+ BufferedExtents buffered_extents;
+ Task *buffered_task_head = nullptr;
+
static void init();
+ public:
+ void queue_buffer_task(Task *t) {
+ Mutex::Locker l(buffer_lock);
+ assert(t->next == nullptr);
+ t->next = buffered_task_head;
+ buffered_task_head = t;
+ }
public:
PerfCounters *logger;