lock_holder, cookie);
}
+ void rbd_ctx_cb(completion_t cb, void *arg)
+ {
+ Context *ctx = reinterpret_cast<Context *>(arg);
+ AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb);
+ ctx->complete(comp->get_return_value());
+ }
int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
int (*cb)(uint64_t, size_t, const char *, void *),
if (r < 0)
return r;
- int64_t ret;
int64_t total_read = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
start_time = ceph_clock_now(ictx->cct);
for (uint64_t i = start_block; i <= end_block; i++) {
- bufferlist bl;
- ictx->lock.Lock();
- string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
- uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
- ictx->lock.Unlock();
+ uint64_t total_off = off + total_read;
+ uint64_t block_ofs = get_block_ofs(ictx->order, total_off);
uint64_t read_len = min(block_size - block_ofs, left);
- uint64_t bytes_read;
-
- if (ictx->object_cacher) {
- r = ictx->read_from_cache(oid, &bl, read_len, block_ofs);
- if (r < 0 && r != -ENOENT)
- return r;
-
- if (r == -ENOENT)
- r = cb(total_read, read_len, NULL, arg);
- else
- r = cb(total_read, read_len, bl.c_str(), arg);
+ buffer::ptr bp(read_len);
+ bufferlist bl;
+ bl.append(bp);
- bytes_read = read_len; // ObjectCacher pads with zeroes at end of object
- } else {
- map<uint64_t, uint64_t> m;
- r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
- if (r < 0 && r == -ENOENT)
- r = 0;
- if (r < 0) {
- return r;
- }
+ Mutex mylock("IoCtxImpl::write::mylock");
+ Cond cond;
+ bool done;
+ int ret;
- r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read,
- read_len, cb, arg);
- bytes_read = r;
- }
+ Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+ AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+ r = aio_read(ictx, total_off, read_len, bl.c_str(), c);
if (r < 0) {
+ c->release();
+ delete ctx;
return r;
}
- total_read += bytes_read;
- left -= bytes_read;
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ c->release();
+ if (ret < 0)
+ return ret;
+
+ r = cb(total_read, ret, bl.c_str(), arg);
+ if (r < 0)
+ return r;
+
+ total_read += ret;
+ left -= ret;
}
- ret = total_read;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->finc(l_librbd_rd_latency, elapsed);
ictx->perfcounter->inc(l_librbd_rd);
ictx->perfcounter->inc(l_librbd_rd_bytes, len);
- return ret;
+ return total_read;
}
int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg)
return 0;
}
-
ssize_t read(ImageCtx *ictx, uint64_t ofs, size_t len, char *buf)
{
return read_iterate(ictx, ofs, len, simple_read_cb, buf);
ldout(ictx->cct, 20) << "write " << ictx << " off = " << off << " len = "
<< len << dendl;
- if (!len)
- return 0;
-
- int r = ictx_check(ictx);
- if (r < 0)
- return r;
-
- r = check_io(ictx, off, len);
- if (r < 0)
+ start_time = ceph_clock_now(ictx->cct);
+ Mutex mylock("librbd::write::mylock");
+ Cond cond;
+ bool done;
+ int ret;
+
+ Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+ AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+ int r = aio_write(ictx, off, len, buf, c);
+ if (r < 0) {
+ c->release();
+ delete ctx;
return r;
+ }
- size_t total_write = 0;
- ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->order, off);
- uint64_t end_block = get_block_num(ictx->order, off + len - 1);
- uint64_t block_size = get_block_size(ictx->order);
- snapid_t snap = ictx->snap_id;
- ictx->lock.Unlock();
- uint64_t left = len;
-
- if (snap != CEPH_NOSNAP)
- return -EROFS;
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
- start_time = ceph_clock_now(ictx->cct);
- for (uint64_t i = start_block; i <= end_block; i++) {
- bufferlist bl;
- ictx->lock.Lock();
- string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
- uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
- ictx->lock.Unlock();
- uint64_t write_len = min(block_size - block_ofs, left);
- bl.append(buf + total_write, write_len);
- if (ictx->object_cacher) {
- ictx->write_to_cache(oid, bl, write_len, block_ofs);
- } else {
- r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
- if (r < 0)
- return r;
- if ((uint64_t)r != write_len)
- return -EIO;
- }
- total_write += write_len;
- left -= write_len;
- }
+ c->release();
+ if (ret < 0)
+ return ret;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->finc(l_librbd_wr_latency, elapsed);
ictx->perfcounter->inc(l_librbd_wr);
- ictx->perfcounter->inc(l_librbd_wr_bytes, total_write);
- return total_write;
+ ictx->perfcounter->inc(l_librbd_wr_bytes, len);
+ return len;
}
int discard(ImageCtx *ictx, uint64_t off, uint64_t len)
ldout(ictx->cct, 20) << "discard " << ictx << " off = " << off << " len = "
<< len << dendl;
- if (!len)
- return 0;
-
- int r = ictx_check(ictx);
- if (r < 0)
- return r;
-
- r = check_io(ictx, off, len);
- if (r < 0)
- return r;
-
- size_t total_write = 0;
- ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->order, off);
- uint64_t end_block = get_block_num(ictx->order, off + len - 1);
- uint64_t block_size = get_block_size(ictx->order);
- ictx->lock.Unlock();
- uint64_t left = len;
-
- vector<ObjectExtent> v;
- if (ictx->object_cacher)
- v.reserve(end_block - start_block + 1);
-
start_time = ceph_clock_now(ictx->cct);
- for (uint64_t i = start_block; i <= end_block; i++) {
- ictx->lock.Lock();
- string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
- uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
- ictx->lock.Unlock();
- uint64_t write_len = min(block_size - block_ofs, left);
-
- if (ictx->object_cacher) {
- v.push_back(ObjectExtent(oid, block_ofs, write_len));
- v.back().oloc.pool = ictx->data_ctx.get_id();
- }
-
- librados::ObjectWriteOperation write_op;
- if (block_ofs == 0 && write_len == block_size)
- write_op.remove();
- else if (write_len + block_ofs == block_size)
- write_op.truncate(block_ofs);
- else
- write_op.zero(block_ofs, write_len);
- r = ictx->data_ctx.operate(oid, &write_op);
- if (r < 0)
- return r;
- total_write += write_len;
- left -= write_len;
+ Mutex mylock("librbd::discard::mylock");
+ Cond cond;
+ bool done;
+ int ret;
+
+ Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+ AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+ int r = aio_discard(ictx, off, len, c);
+ if (r < 0) {
+ c->release();
+ delete ctx;
+ return r;
}
- if (ictx->object_cacher)
- ictx->object_cacher->discard_set(ictx->object_set, v);
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+
+ c->release();
+ if (ret < 0)
+ return ret;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->inc(l_librbd_discard_latency, elapsed);
ictx->perfcounter->inc(l_librbd_discard);
- ictx->perfcounter->inc(l_librbd_discard_bytes, total_write);
- return total_write;
+ ictx->perfcounter->inc(l_librbd_discard_bytes, len);
+ return len;
}
ssize_t handle_sparse_read(CephContext *cct,
if (r < 0)
return r;
+ // TODO: check for snap
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
c->set_complete_cb(cb_arg, cb_complete);
return c;
}
+
+ AioCompletion *aio_create_completion_internal(void *cb_arg,
+ callback_t cb_complete) {
+ AioCompletion *c = aio_create_completion(cb_arg, cb_complete);
+ c->rbd_comp = c;
+ return c;
+ }
}