io_request.nseg = 0;
}
- void copy_to_buf(const bufferptr& p, uint64_t off, uint64_t len) {
- copy_to_buf((char *)p.c_str(), off, len);
- bl.append(std::move(p));
- }
-
void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
uint64_t copied = 0;
uint64_t left = len;
ioc->num_pending -= pending;
assert(ioc->num_pending.load() == 0); // we should be only thread doing this
// Only need to push the first entry
- if(queue_id == -1)
- queue_id = ceph_gettid();
- driver->get_queue(queue_id)->queue_task(t, pending);
+ if (queue_id == -1)
+ queue_id = ceph_gettid();
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+ driver->get_queue(queue_id)->queue_task(t, pending);
+ }
+}
+
+static void write_split(
+ NVMEDevice *dev,
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc)
+{
+ uint64_t remain_len = bl.length(), begin = 0, write_size;
+ Task *t, *first, *last;
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+
+ while (remain_len > 0) {
+ write_size = std::min(remain_len, split_size);
+ t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ bl.splice(0, write_size, &t->bl);
+ remain_len -= write_size;
+ t->ctx = ioc;
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+ begin += write_size;
}
}
<< " buffered " << buffered << dendl;
assert(is_valid_io(off, len));
- Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
-
- // TODO: if upper layer alloc memory with known physical address,
- // we can reduce this copy
- t->bl = std::move(bl);
-
- t->ctx = ioc;
- Task *first = static_cast<Task*>(ioc->nvme_task_first);
- Task *last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ++ioc->num_pending;
-
+ write_split(this, off, bl, ioc);
dout(5) << __func__ << " " << off << "~" << len << dendl;
return 0;
assert(off + len <= size);
IOContext ioc(cct, NULL);
- Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
-
- // TODO: if upper layer alloc memory with known physical address,
- // we can reduce this copy
- t->bl = std::move(bl);
- t->ctx = &ioc;
- if(queue_id == -1)
- queue_id = ceph_gettid();
- ++ioc.num_running;
- driver->get_queue(queue_id)->queue_task(t);
-
+ write_split(this, off, bl, &ioc);
dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&ioc);
ioc.aio_wait();
return 0;
}
bufferptr p = buffer::create_page_aligned(len);
int r = 0;
t->ctx = ioc;
- pbl->append(std::move(t->bl));
- t->fill_cb = [p, t]() {
- t->copy_to_buf(p, 0, t->len);
+ char *buf = p.c_str();
+ t->fill_cb = [buf, t]() {
+ t->copy_to_buf(buf, 0, t->len);
};
+
++ioc->num_running;
if(queue_id == -1)
queue_id = ceph_gettid();
Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
bufferptr p = buffer::create_page_aligned(len);
- pbl->append(std::move(t->bl));
+ pbl->append(p);
t->ctx = ioc;
- t->fill_cb = [p, t]() {
- t->copy_to_buf(p, 0, t->len);
+ char* buf = p.c_str();
+ t->fill_cb = [buf, t]() {
+ t->copy_to_buf(buf, 0, t->len);
};
Task *first = static_cast<Task*>(ioc->nvme_task_first);