std::function<void()> fill_cb;
Task *next = nullptr;
int64_t return_code;
+ Task *primary = nullptr;
ceph::coarse_real_clock::time_point start;
- IORequest io_request;
+ IORequest io_request = {};
ceph::mutex lock = ceph::make_mutex("Task::lock");
ceph::condition_variable cond;
SharedDriverQueueData *queue = nullptr;
- Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
+ // reference count by subtasks.
+ int ref = 0;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+ Task *p = nullptr)
: device(dev), command(c), offset(off), len(l),
- return_code(rc),
- start(ceph::coarse_real_clock::now()) {}
+ return_code(rc), primary(p),
+ start(ceph::coarse_real_clock::now()) {
+ if (primary) {
+ primary->ref++;
+ return_code = primary->return_code;
+ }
+ }
~Task() {
+ if (primary)
+ primary->ref--;
ceph_assert(!io_request.nseg);
}
void release_segs(SharedDriverQueueData *queue_data) {
}
delete task;
} else {
- task->return_code = 0;
+ if (Task* primary = task->primary; primary != nullptr) {
+ delete task;
+ if (!primary->ref)
+ primary->return_code = 0;
+ } else {
+ task->return_code = 0;
+ }
ctx->try_aio_wake();
}
} else {
}
}
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+ Task *first, *last;
+
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+}
+
static void write_split(
NVMEDevice *dev,
uint64_t off,
IOContext *ioc)
{
uint64_t remain_len = bl.length(), begin = 0, write_size;
- Task *t, *first, *last;
+ Task *t;
// This value may need to be got from configuration later.
uint64_t split_size = 131072; // 128KB.
bl.splice(0, write_size, &t->bl);
remain_len -= write_size;
t->ctx = ioc;
- first = static_cast<Task*>(ioc->nvme_task_first);
- last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ++ioc->num_pending;
+ ioc_append_task(ioc, t);
begin += write_size;
}
}
+static void make_read_tasks(
+ NVMEDevice *dev,
+ uint64_t aligned_off,
+ IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+ uint64_t orig_off, uint64_t orig_len)
+{
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+ uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+ auto begin = aligned_off;
+ const auto aligned_end = begin + aligned_len;
+
+ for (; begin < aligned_end; begin += split_size) {
+ auto read_size = std::min(aligned_end - begin, split_size);
+ auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+ Task *t = nullptr;
+
+ if (primary && (aligned_len <= split_size)) {
+ t = primary;
+ } else {
+ t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+ }
+
+ t->ctx = ioc;
+
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ t->fill_cb = [buf, t, tmp_off, tmp_len] {
+ t->copy_to_buf(buf, tmp_off, tmp_len);
+ };
+
+ ioc_append_task(ioc, t);
+ remain_orig_len -= tmp_len;
+ buf += tmp_len;
+ tmp_off = 0;
+ }
+}
+
int NVMEDevice::aio_write(
uint64_t off,
bufferlist &bl,
Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
bufferptr p = buffer::create_small_page_aligned(len);
int r = 0;
- t->ctx = ioc;
char *buf = p.c_str();
- t->fill_cb = [buf, t]() {
- t->copy_to_buf(buf, 0, t->len);
- };
- ++ioc->num_pending;
- ioc->nvme_task_first = t;
+ ceph_assert(ioc->nvme_task_first == nullptr);
+ ceph_assert(ioc->nvme_task_last == nullptr);
+ make_read_tasks(this, off, ioc, buf, len, t, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
aio_submit(ioc);
ioc->aio_wait();
{
dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
ceph_assert(is_valid_io(off, len));
-
- Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
-
bufferptr p = buffer::create_small_page_aligned(len);
pbl->append(p);
- t->ctx = ioc;
char* buf = p.c_str();
- t->fill_cb = [buf, t]() {
- t->copy_to_buf(buf, 0, t->len);
- };
-
- Task *first = static_cast<Task*>(ioc->nvme_task_first);
- Task *last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ++ioc->num_pending;
+ make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
return 0;
}
IOContext ioc(g_ceph_context, nullptr);
Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
int r = 0;
- t->ctx = &ioc;
- t->fill_cb = [buf, t, off, len]() {
- t->copy_to_buf(buf, off-t->offset, len);
- };
- ++ioc.num_pending;
- ioc.nvme_task_first = t;
+ make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, t, off, len);
aio_submit(&ioc);
ioc.aio_wait();