]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Parallelize RBD import/export 2390/head
authorJason Dillaman <dillaman@redhat.com>
Tue, 2 Sep 2014 20:43:38 +0000 (16:43 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 3 Sep 2014 17:27:32 +0000 (13:27 -0400)
Use librbd aio_read/_write calls where possible to improve speed.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/rbd.cc

index 9a411ec7a73050519a5e8cd72fd43c501612a5c8..12890885d0f138c6537b471692484b7ecc2015ef 100644 (file)
@@ -29,6 +29,7 @@
 #include "include/compat.h"
 #include "common/blkdev.h"
 
+#include <boost/scope_exit.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <errno.h>
 #include <iostream>
@@ -45,6 +46,7 @@
 #include "include/util.h"
 
 #include "common/Formatter.h"
+#include "common/Throttle.h"
 
 #if defined(__linux__)
 #include <linux/fs.h>
@@ -982,92 +984,125 @@ struct ExportContext {
   {}
 };
 
-static int export_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg)
+class AioExportContext : public Context
 {
-  ssize_t ret;
-  ExportContext *ec = static_cast<ExportContext *>(arg);
-  int fd = ec->fd;
-  static char *localbuf = NULL;
-  static size_t maplen = 0;
-
-  if (fd == 1) {
-    if (!buf) {
-      // can't seek stdout; need actual data to write
-      if (maplen < len) {
-       // never mapped, or need to map larger
-       int r;
-       if (localbuf != NULL){
-         if ((r = munmap(localbuf, len)) < 0) {
-           cerr << "rbd: error " << r << "munmap'ing buffer" << std::endl;
-           return errno;
-         }
-       }
+public:
+  AioExportContext(SimpleThrottle &simple_throttle, librbd::Image &image,
+                   uint64_t offset, uint64_t length, int fd)
+    : m_aio_completion(
+        new librbd::RBD::AioCompletion(this, &AioExportContext::aio_callback)),
+      m_throttle(simple_throttle),
+      m_offset(offset),
+      m_fd(fd)
+  {
+    m_throttle.start_op();
+    int r = image.aio_read(offset, length, m_bufferlist, m_aio_completion);
+    if (r < 0) {
+      cerr << "rbd: error requesting read from source image" << std::endl;
+      m_throttle.end_op(r);
+    }
+  }
 
-       maplen = len;
-       localbuf = (char *)mmap(NULL, maplen, PROT_READ,
-                               MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
-       if (localbuf == MAP_FAILED) {
-         cerr << "rbd: MAP_FAILED mmap'ing buffer for zero writes"
-              << std::endl;
-         return -ENOMEM;
-       }
-      }
-      ret = write(fd, localbuf, len);
-    } else {
-      ret = write(fd, buf, len);
+  virtual ~AioExportContext()
+  {
+    m_aio_completion->release();
+  }
+
+  virtual void finish(int r)
+  {
+    BOOST_SCOPE_EXIT((&m_throttle) (&r))
+    {
+      m_throttle.end_op(r);
+    } BOOST_SCOPE_EXIT_END
+
+    if (r < 0) {
+      cerr << "rbd: error reading from source image at offset "
+           << m_offset << ": " << cpp_strerror(r) << std::endl;
+      return;
     }
-  } else {             // not stdout
-    if (!buf || buf_is_zero(buf, len)) {
-      /* a hole */
-      return 0;
+
+    assert(m_bufferlist.length() == static_cast<size_t>(r));
+    if (m_fd != STDOUT_FILENO) {
+      if (m_bufferlist.is_zero()) {
+        return;
+      }
+
+      r = lseek64(m_fd, m_offset, SEEK_SET);
+      if (static_cast<uint64_t>(r) != m_offset) {
+        cerr << "rbd: error seeking destination image to offset "
+             << m_offset << std::endl;
+        r = -errno;
+        return;
+      }
     }
 
-    ret = lseek64(fd, ofs, SEEK_SET);
-    if (ret < 0)
-      return -errno;
-    ret = write(fd, buf, len);
-    ec->pc.update_progress(ofs, ec->totalsize);
+    r = m_bufferlist.write_fd(m_fd);
+    if (r < 0) {
+      cerr << "rbd: error writing to destination image at offset "
+           << m_offset << std::endl;
+    }
   }
 
-  if (ret < 0)
-    return -errno;
-
-  return 0;
-}
+  static void aio_callback(librbd::completion_t completion, void *arg)
+  {
+    librbd::RBD::AioCompletion *aio_completion =
+      reinterpret_cast<librbd::RBD::AioCompletion*>(completion);
+    AioExportContext *export_context = reinterpret_cast<AioExportContext*>(arg);
+    export_context->complete(aio_completion->get_return_value());
+  }
+
+private:
+  librbd::RBD::AioCompletion *m_aio_completion;
+  SimpleThrottle &m_throttle;
+  bufferlist m_bufferlist;
+  uint64_t m_offset;
+  int m_fd;
+};
 
 static int do_export(librbd::Image& image, const char *path)
 {
-  int64_t r;
   librbd::image_info_t info;
-  int fd;
-
-  r = image.stat(info, sizeof(info));
+  int64_t r = image.stat(info, sizeof(info));
   if (r < 0)
     return r;
 
-  if (strcmp(path, "-") == 0)
-    fd = 1;
-  else
+  int fd;
+  int max_concurrent_ops;
+  bool to_stdout = (strcmp(path, "-") == 0);
+  if (to_stdout) {
+    fd = STDOUT_FILENO;
+    max_concurrent_ops = 1;
+  } else {
+    max_concurrent_ops = max(g_conf->rbd_concurrent_management_ops, 1);
     fd = open(path, O_WRONLY | O_CREAT | O_EXCL, 0644);
-  if (fd < 0)
-    return -errno;
+    if (fd < 0) {
+      return -errno;
+    }
+  }
 
-  ExportContext ec(&image, fd, info.size);
-  r = image.read_iterate2(0, info.size, export_read_cb, (void *)&ec);
-  if (r < 0)
-    goto out;
+  MyProgressContext pc("Exporting image");
 
-  if (fd != 1)
-    r = ftruncate(fd, info.size);
-  if (r < 0)
-    goto out;
+  SimpleThrottle throttle(max_concurrent_ops, false);
+  uint64_t period = image.get_stripe_count() * (1ull << info.order);
+  for (uint64_t offset = 0; offset < info.size; offset += period) {
+    uint64_t length = min(period, info.size - offset);
+    new AioExportContext(throttle, image, offset, length, fd);
+    pc.update_progress(offset, info.size);
+  }
 
- out:
-  close(fd);
-  if (r < 0)
-    ec.pc.fail();
-  else
-    ec.pc.finish();
+  r = throttle.wait_for_ret();
+  if (!to_stdout) {
+    if (r >= 0) {
+      r = ftruncate(fd, info.size);
+    }
+    close(fd);
+  }
+
+  if (r < 0) {
+    pc.fail();
+  } else {
+    pc.finish();
+  }
   return r;
 }
 
@@ -1288,6 +1323,53 @@ done_img:
   update_snap_name(*new_img, snap);
 }
 
+class AioImportContext : public Context
+{
+public:
+  AioImportContext(SimpleThrottle &simple_throttle, librbd::Image &image,
+                   bufferlist &bl, uint64_t offset)
+    : m_throttle(simple_throttle),
+      m_aio_completion(
+        new librbd::RBD::AioCompletion(this, &AioImportContext::aio_callback)),
+      m_offset(offset)
+  {
+    m_throttle.start_op();
+
+    int r = image.aio_write(m_offset, bl.length(), bl, m_aio_completion);
+    if (r < 0) {
+      cerr << "rbd: error requesting write to destination image" << std::endl;
+      m_throttle.end_op(r);
+    }
+  }
+
+  virtual ~AioImportContext()
+  {
+    m_aio_completion->release();
+  }
+
+  virtual void finish(int r)
+  {
+    if (r < 0) {
+      cerr << "rbd: error writing to destination image at offset "
+           << m_offset << ": " << cpp_strerror(r) << std::endl;
+    }
+    m_throttle.end_op(r);
+  }
+
+  static void aio_callback(librbd::completion_t completion, void *arg)
+  {
+    librbd::RBD::AioCompletion *aio_completion =
+      reinterpret_cast<librbd::RBD::AioCompletion*>(completion);
+    AioImportContext *import_context = reinterpret_cast<AioImportContext*>(arg);
+    import_context->complete(aio_completion->get_return_value());
+  }
+
+private:
+  SimpleThrottle &m_throttle;
+  librbd::RBD::AioCompletion *m_aio_completion;
+  uint64_t m_offset;
+};
+
 static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
                     const char *imgname, int *order, const char *path,
                     int format, uint64_t features, uint64_t size)
@@ -1311,11 +1393,15 @@ static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
   size_t blklen = 0;           // amount accumulated from reads to fill blk
   librbd::Image image;
 
+  boost::scoped_ptr<SimpleThrottle> throttle;
   bool from_stdin = !strcmp(path, "-");
   if (from_stdin) {
+    throttle.reset(new SimpleThrottle(1, false));
     fd = 0;
     size = 1ULL << *order;
   } else {
+    throttle.reset(new SimpleThrottle(
+      max(g_conf->rbd_concurrent_management_ops, 1), false));
     if ((fd = open(path, O_RDONLY)) < 0) {
       r = -errno;
       cerr << "rbd: error opening " << path << std::endl;
@@ -1383,13 +1469,9 @@ static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
     // write as much as we got; perhaps less than imgblklen
     // but skip writing zeros to create sparse images
     if (!bl.is_zero()) {
-      r = image.write(image_pos, blklen, bl);
-      if (r < 0) {
-       cerr << "rbd: error writing to image position " << image_pos
-            << std::endl;
-       goto done;
-      }
+      new AioImportContext(*throttle, image, bl, image_pos);
     }
+
     // done with whole block, whether written or not
     image_pos += blklen;
     // if read had returned 0, we're at EOF and should quit
@@ -1398,6 +1480,11 @@ static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
     blklen = 0;
     reqlen = imgblklen;
   }
+  r = throttle->wait_for_ret();
+  if (r < 0) {
+    goto done;
+  }
+
   if (from_stdin) {
     r = image.resize(image_pos);
     if (r < 0) {