]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: refactor synchronous I/O
authorJosh Durgin <josh.durgin@inktank.com>
Tue, 24 Jul 2012 00:28:49 +0000 (17:28 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Mon, 30 Jul 2012 18:19:53 +0000 (11:19 -0700)
Write in terms of the asynchronous functions, so all the logic
is not duplicated. Now there's only a single point where each
operation needs to change for layering.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/librbd/internal.cc
src/librbd/internal.h

index 2fc0ae0dbf8d04ced9b47ceaa20e2c3b65e86555..538bd4e3b7f8c4102085950a8bbb32d53ed45a2c 100644 (file)
@@ -1583,6 +1583,12 @@ namespace librbd {
                                  lock_holder, cookie);
   }
 
+  void rbd_ctx_cb(completion_t cb, void *arg)
+  {
+    Context *ctx = reinterpret_cast<Context *>(arg);
+    AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb);
+    ctx->complete(comp->get_return_value());
+  }
 
   int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
                       int (*cb)(uint64_t, size_t, const char *, void *),
@@ -1601,7 +1607,6 @@ namespace librbd {
     if (r < 0)
       return r;
 
-    int64_t ret;
     int64_t total_read = 0;
     ictx->lock.Lock();
     uint64_t start_block = get_block_num(ictx->order, off);
@@ -1612,51 +1617,49 @@ namespace librbd {
 
     start_time = ceph_clock_now(ictx->cct);
     for (uint64_t i = start_block; i <= end_block; i++) {
-      bufferlist bl;
-      ictx->lock.Lock();
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
-      ictx->lock.Unlock();
+      uint64_t total_off = off + total_read;
+      uint64_t block_ofs = get_block_ofs(ictx->order, total_off);
       uint64_t read_len = min(block_size - block_ofs, left);
-      uint64_t bytes_read;
-
-      if (ictx->object_cacher) {
-       r = ictx->read_from_cache(oid, &bl, read_len, block_ofs);
-       if (r < 0 && r != -ENOENT)
-         return r;
-
-       if (r == -ENOENT)
-         r = cb(total_read, read_len, NULL, arg);
-       else
-         r = cb(total_read, read_len, bl.c_str(), arg);
+      buffer::ptr bp(read_len);
+      bufferlist bl;
+      bl.append(bp);
 
-       bytes_read = read_len; // ObjectCacher pads with zeroes at end of object
-      } else {
-       map<uint64_t, uint64_t> m;
-       r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
-       if (r < 0 && r == -ENOENT)
-         r = 0;
-       if (r < 0) {
-         return r;
-       }
+      Mutex mylock("IoCtxImpl::write::mylock");
+      Cond cond;
+      bool done;
+      int ret;
 
-       r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read,
-                              read_len, cb, arg);
-       bytes_read = r;
-      }
+      Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+      AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+      r = aio_read(ictx, total_off, read_len, bl.c_str(), c);
       if (r < 0) {
+       c->release();
+       delete ctx;
        return r;
       }
-      total_read += bytes_read;
-      left -= bytes_read;
+
+      mylock.Lock();
+      while (!done)
+       cond.Wait(mylock);
+      mylock.Unlock();
+
+      c->release();
+      if (ret < 0)
+       return ret;
+
+      r = cb(total_read, ret, bl.c_str(), arg);
+      if (r < 0)
+       return r;
+
+      total_read += ret;
+      left -= ret;
     }
-    ret = total_read;
 
     elapsed = ceph_clock_now(ictx->cct) - start_time;
     ictx->perfcounter->finc(l_librbd_rd_latency, elapsed);
     ictx->perfcounter->inc(l_librbd_rd);
     ictx->perfcounter->inc(l_librbd_rd_bytes, len);
-    return ret;
+    return total_read;
   }
 
   int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg)
@@ -1670,7 +1673,6 @@ namespace librbd {
     return 0;
   }
 
-
   ssize_t read(ImageCtx *ictx, uint64_t ofs, size_t len, char *buf)
   {
     return read_iterate(ictx, ofs, len, simple_read_cb, buf);
@@ -1682,56 +1684,35 @@ namespace librbd {
     ldout(ictx->cct, 20) << "write " << ictx << " off = " << off << " len = "
                         << len << dendl;
 
-    if (!len)
-      return 0;
-
-    int r = ictx_check(ictx);
-    if (r < 0)
-      return r;
-
-    r = check_io(ictx, off, len);
-    if (r < 0)
+    start_time = ceph_clock_now(ictx->cct);
+    Mutex mylock("librbd::write::mylock");
+    Cond cond;
+    bool done;
+    int ret;
+
+    Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+    AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+    int r = aio_write(ictx, off, len, buf, c);
+    if (r < 0) {
+      c->release();
+      delete ctx;
       return r;
+    }
 
-    size_t total_write = 0;
-    ictx->lock.Lock();
-    uint64_t start_block = get_block_num(ictx->order, off);
-    uint64_t end_block = get_block_num(ictx->order, off + len - 1);
-    uint64_t block_size = get_block_size(ictx->order);
-    snapid_t snap = ictx->snap_id;
-    ictx->lock.Unlock();
-    uint64_t left = len;
-
-    if (snap != CEPH_NOSNAP)
-      return -EROFS;
+    mylock.Lock();
+    while (!done)
+      cond.Wait(mylock);
+    mylock.Unlock();
 
-    start_time = ceph_clock_now(ictx->cct);
-    for (uint64_t i = start_block; i <= end_block; i++) {
-      bufferlist bl;
-      ictx->lock.Lock();
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
-      ictx->lock.Unlock();
-      uint64_t write_len = min(block_size - block_ofs, left);
-      bl.append(buf + total_write, write_len);
-      if (ictx->object_cacher) {
-       ictx->write_to_cache(oid, bl, write_len, block_ofs);
-      } else {
-       r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
-       if (r < 0)
-         return r;
-       if ((uint64_t)r != write_len)
-         return -EIO;
-      }
-      total_write += write_len;
-      left -= write_len;
-    }
+    c->release();
+    if (ret < 0)
+      return ret;
 
     elapsed = ceph_clock_now(ictx->cct) - start_time;
     ictx->perfcounter->finc(l_librbd_wr_latency, elapsed);
     ictx->perfcounter->inc(l_librbd_wr);
-    ictx->perfcounter->inc(l_librbd_wr_bytes, total_write);
-    return total_write;
+    ictx->perfcounter->inc(l_librbd_wr_bytes, len);
+    return len;
   }
 
   int discard(ImageCtx *ictx, uint64_t off, uint64_t len)
@@ -1740,64 +1721,35 @@ namespace librbd {
     ldout(ictx->cct, 20) << "discard " << ictx << " off = " << off << " len = "
                         << len << dendl;
 
-    if (!len)
-      return 0;
-
-    int r = ictx_check(ictx);
-    if (r < 0)
-      return r;
-
-    r = check_io(ictx, off, len);
-    if (r < 0)
-      return r;
-
-    size_t total_write = 0;
-    ictx->lock.Lock();
-    uint64_t start_block = get_block_num(ictx->order, off);
-    uint64_t end_block = get_block_num(ictx->order, off + len - 1);
-    uint64_t block_size = get_block_size(ictx->order);
-    ictx->lock.Unlock();
-    uint64_t left = len;
-
-    vector<ObjectExtent> v;
-    if (ictx->object_cacher)
-      v.reserve(end_block - start_block + 1);
-
     start_time = ceph_clock_now(ictx->cct);
-    for (uint64_t i = start_block; i <= end_block; i++) {
-      ictx->lock.Lock();
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
-      ictx->lock.Unlock();
-      uint64_t write_len = min(block_size - block_ofs, left);
-
-      if (ictx->object_cacher) {
-       v.push_back(ObjectExtent(oid, block_ofs, write_len));
-       v.back().oloc.pool = ictx->data_ctx.get_id();
-      }
-
-      librados::ObjectWriteOperation write_op;
-      if (block_ofs == 0 && write_len == block_size)
-       write_op.remove();
-      else if (write_len + block_ofs == block_size)
-       write_op.truncate(block_ofs);
-      else
-       write_op.zero(block_ofs, write_len);
-      r = ictx->data_ctx.operate(oid, &write_op);
-      if (r < 0)
-       return r;
-      total_write += write_len;
-      left -= write_len;
+    Mutex mylock("librbd::discard::mylock");
+    Cond cond;
+    bool done;
+    int ret;
+
+    Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
+    AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
+    int r = aio_discard(ictx, off, len, c);
+    if (r < 0) {
+      c->release();
+      delete ctx;
+      return r;
     }
 
-    if (ictx->object_cacher)
-      ictx->object_cacher->discard_set(ictx->object_set, v);
+    mylock.Lock();
+    while (!done)
+      cond.Wait(mylock);
+    mylock.Unlock();
+
+    c->release();
+    if (ret < 0)
+      return ret;
 
     elapsed = ceph_clock_now(ictx->cct) - start_time;
     ictx->perfcounter->inc(l_librbd_discard_latency, elapsed);
     ictx->perfcounter->inc(l_librbd_discard);
-    ictx->perfcounter->inc(l_librbd_discard_bytes, total_write);
-    return total_write;
+    ictx->perfcounter->inc(l_librbd_discard_bytes, len);
+    return len;
   }
 
   ssize_t handle_sparse_read(CephContext *cct,
@@ -2005,6 +1957,7 @@ namespace librbd {
     if (r < 0)
       return r;
 
+    // TODO: check for snap
     size_t total_write = 0;
     ictx->lock.Lock();
     uint64_t start_block = get_block_num(ictx->order, off);
@@ -2166,4 +2119,11 @@ namespace librbd {
     c->set_complete_cb(cb_arg, cb_complete);
     return c;
   }
+
+  AioCompletion *aio_create_completion_internal(void *cb_arg,
+                                               callback_t cb_complete) {
+    AioCompletion *c = aio_create_completion(cb_arg, cb_complete);
+    c->rbd_comp = c;
+    return c;
+  }
 }
index e5e5622b8d76414e6d9c8ff267e14366350b3b11..2cdfe080cad495603515dfb614093d57a6bc1474 100644 (file)
@@ -180,6 +180,8 @@ namespace librbd {
 
   AioCompletion *aio_create_completion();
   AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete);
+  AioCompletion *aio_create_completion_internal(void *cb_arg,
+                                               callback_t cb_complete);
 
   // raw callbacks
   int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg);