]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd: concurrent v2 image IO during import/import-diff
authorVenky Shankar <vshankar@redhat.com>
Thu, 2 Mar 2017 05:11:56 +0000 (10:41 +0530)
committerVenky Shankar <vshankar@redhat.com>
Wed, 8 Mar 2017 03:02:57 +0000 (08:32 +0530)
Fixes: http://tracker.ceph.com/issues/19034
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/rbd/action/Import.cc

index 795858bd482223f02478894851a9e3d5d8155dbf..7f8aad17beef0f309ddd58f4be4bcc7b68d5db05 100644 (file)
@@ -21,176 +21,345 @@ namespace rbd {
 namespace action {
 namespace import {
 
-int do_import_diff_fd(librbd::Image &image, int fd,
-                      bool no_progress, int format)
+struct ImportDiffContext {
+  librbd::Image *image;
+  int fd;
+  size_t size;
+  utils::ProgressContext pc;
+  OrderedThrottle throttle;
+  uint64_t last_offset;
+
+  ImportDiffContext(librbd::Image *image, int fd, size_t size, bool no_progress)
+    : image(image), fd(fd), size(size), pc("Importing image diff", no_progress),
+      throttle((fd == STDIN_FILENO) ? 1 :
+               max(g_conf->rbd_concurrent_management_ops, 1), false), last_offset(0) {
+  }
+
+  void update_size(size_t new_size)
+  {
+    if (fd == STDIN_FILENO) {
+      size = new_size;
+    }
+  }
+
+  void update_progress(uint64_t off)
+  {
+    if (size) {
+      pc.update_progress(off, size);
+      last_offset = off;
+    }
+  }
+
+  void update_progress()
+  {
+    uint64_t off = last_offset;
+    if (fd != STDIN_FILENO) {
+      off = lseek(fd, 0, SEEK_CUR);
+    }
+
+    update_progress(off);
+  }
+
+  void finish(int r)
+  {
+    if (r < 0) {
+      pc.fail();
+    } else {
+      pc.finish();
+    }
+  }
+};
+
+class C_ImportDiff : public Context {
+public:
+  C_ImportDiff(ImportDiffContext *idiffctx, bufferlist data, uint64_t offset,
+               uint64_t length, bool discard)
+    : m_idiffctx(idiffctx), m_data(data), m_offset(offset), m_length(length),
+      m_discard(discard) {
+    // use block offset (stdin) or import file position to report
+    // progress.
+    if (m_idiffctx->fd == STDIN_FILENO) {
+      m_prog_offset = offset;
+    } else {
+      m_prog_offset  = lseek(m_idiffctx->fd, 0, SEEK_CUR);
+    }
+  }
+
+  int send()
+  {
+    if (m_idiffctx->throttle.pending_error()) {
+      return m_idiffctx->throttle.wait_for_ret();
+    }
+
+    C_OrderedThrottle *ctx = m_idiffctx->throttle.start_op(this);
+    librbd::RBD::AioCompletion *aio_completion =
+      new librbd::RBD::AioCompletion(ctx, &utils::aio_context_callback);
+
+    int r;
+    if (m_discard) {
+      r = m_idiffctx->image->aio_discard(m_offset, m_length, aio_completion);
+    } else {
+      r = m_idiffctx->image->aio_write2(m_offset, m_length, m_data,
+                                        aio_completion, LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
+    }
+
+    if (r < 0) {
+      aio_completion->release();
+      ctx->complete(r);
+    }
+
+    return r;
+  }
+
+  void finish(int r) override
+  {
+    m_idiffctx->update_progress(m_prog_offset);
+    m_idiffctx->throttle.end_op(r);
+  }
+
+private:
+  ImportDiffContext *m_idiffctx;
+  bufferlist m_data;
+  uint64_t m_offset;
+  uint64_t m_length;
+  bool m_discard;
+  uint64_t m_prog_offset;
+};
+
+static int do_image_snap_from(ImportDiffContext *idiffctx)
+{
+  int r;
+  string from;
+  r = utils::read_string(idiffctx->fd, 4096, &from);   // 4k limit to make sure we don't get a garbage string
+  if (r < 0) {
+    return r;
+  }
+
+  bool exists;
+  r = idiffctx->image->snap_exists2(from.c_str(), &exists);
+  if (r < 0) {
+    return r;
+  }
+
+  if (!exists) {
+    std::cerr << "start snapshot '" << from
+              << "' does not exist in the image, aborting" << std::endl;
+    return -EINVAL;
+  }
+
+  idiffctx->update_progress();
+  return 0;
+}
+
+static int do_image_snap_to(ImportDiffContext *idiffctx, std::string *tosnap)
+{
+  int r;
+  string to;
+  r = utils::read_string(idiffctx->fd, 4096, &to);   // 4k limit to make sure we don't get a garbage string
+  if (r < 0) {
+    return r;
+  }
+
+  bool exists;
+  r = idiffctx->image->snap_exists2(to.c_str(), &exists);
+  if (r < 0) {
+    return r;
+  }
+
+  if (exists) {
+    std::cerr << "end snapshot '" << to << "' already exists, aborting" << std::endl;
+    return -EEXIST;
+  }
+
+  *tosnap = to;
+  idiffctx->update_progress();
+
+  return 0;
+}
+
+static int do_image_resize(ImportDiffContext *idiffctx)
+{
+  int r;
+  char buf[sizeof(uint64_t)];
+  uint64_t end_size;
+  r = safe_read_exact(idiffctx->fd, buf, sizeof(buf));
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist bl;
+  bl.append(buf, sizeof(buf));
+  bufferlist::iterator p = bl.begin();
+  ::decode(end_size, p);
+
+  uint64_t cur_size;
+  idiffctx->image->size(&cur_size);
+  if (cur_size != end_size) {
+    idiffctx->image->resize(end_size);
+  }
+
+  idiffctx->update_size(end_size);
+  idiffctx->update_progress();
+  return 0;
+}
+
+static int do_image_io(ImportDiffContext *idiffctx, bool discard)
+{
+  int r;
+  char buf[16];
+  r = safe_read_exact(idiffctx->fd, buf, sizeof(buf));
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist bl;
+  bl.append(buf, sizeof(buf));
+  bufferlist::iterator p = bl.begin();
+
+  uint64_t off, len;
+  ::decode(off, p);
+  ::decode(len, p);
+
+  bufferlist data;
+  if (!discard) {
+    bufferptr bp = buffer::create(len);
+    r = safe_read_exact(idiffctx->fd, bp.c_str(), len);
+    if (r < 0) {
+      return r;
+    }
+    data.append(bp);
+  }
+
+  C_ImportDiff *ctx = new C_ImportDiff(idiffctx, data, off, len, discard);
+  return ctx->send();
+}
+
+static int validate_banner(int fd, std::string banner)
 {
   int r;
-  struct stat stat_buf;
-  utils::ProgressContext pc("Importing image diff", no_progress);
-  uint64_t size = 0;
-  uint64_t off = 0;
-  string from, to;
-  string banner = (format == 1 ? utils::RBD_DIFF_BANNER : utils::RBD_DIFF_BANNER_V2);
   char buf[banner.size() + 1];
+  r = safe_read_exact(fd, buf, banner.size());
+  if (r < 0) {
+    return r;
+  }
+
+  buf[banner.size()] = '\0';
+  if (strcmp(buf, banner.c_str())) {
+    std::cerr << "invalid banner '" << buf << "', expected '" << banner << "'" << std::endl;
+    return -EINVAL;
+  }
 
+  return 0;
+}
+
+static int skip_tag(int fd, uint64_t length)
+{
+  int r;
+
+  if (fd == STDIN_FILENO) {
+    // read the appending data out to skip this tag.
+    char buf[4096];
+    uint64_t len = min(length, sizeof(buf));
+    while (len > 0) {
+      r = safe_read_exact(fd, buf, len);
+      if (r < 0)
+        return r;
+      length -= len;
+      len = min(length, sizeof(buf));
+    }
+  } else {
+    // lseek to skip this tag
+    off64_t offs = lseek64(fd, length, SEEK_CUR);
+    if (offs < 0) {
+      return -errno;
+    }
+  }
+
+  return 0;
+}
+
+static int read_tag(int fd, __u8 end_tag, int format, __u8 *tag, uint64_t *readlen)
+{
+  int r;
+  __u8 read_tag;
+
+  r = safe_read_exact(fd, &read_tag, sizeof(read_tag));
+  if (r < 0) {
+    return r;
+  }
+
+  *tag = read_tag;
+  if (read_tag != end_tag && format == 2) {
+    char buf[sizeof(uint64_t)];
+    r = safe_read_exact(fd, buf, sizeof(buf));
+    if (r < 0) {
+      return r;
+    }
+
+    bufferlist bl;
+    bl.append(buf, sizeof(buf));
+    bufferlist::iterator p = bl.begin();
+    ::decode(*readlen, p);
+  }
+
+  return 0;
+}
+
+int do_import_diff_fd(librbd::Image &image, int fd, bool no_progress, int format)
+{
+  int r;
+
+  uint64_t size = 0;
   bool from_stdin = (fd == STDIN_FILENO);
   if (!from_stdin) {
+    struct stat stat_buf;
     r = ::fstat(fd, &stat_buf);
-    if (r < 0)
-      goto done;
+    if (r < 0) {
+      return r;
+    }
     size = (uint64_t)stat_buf.st_size;
   }
 
-  r = safe_read_exact(fd, buf, banner.size());
-  if (r < 0)
-    goto done;
-  buf[banner.size()] = '\0';
-  if (strcmp(buf, banner.c_str())) {
-    std::cerr << "invalid banner '" << buf << "', expected '"
-            << banner << "'" << std::endl;
-    r = -EINVAL;
-    goto done;
+  r = validate_banner(fd, (format == 1 ? utils::RBD_DIFF_BANNER :
+                           utils::RBD_DIFF_BANNER_V2));
+  if (r < 0) {
+    return r;
   }
 
-  while (true) {
+  // begin image import
+  std::string tosnap;
+  ImportDiffContext idiffctx(&image, fd, size, no_progress);
+  while (r == 0) {
     __u8 tag;
     uint64_t length = 0;
-    r = safe_read_exact(fd, &tag, 1);
-    if (r < 0) {
-      goto done;
-    }
 
-    if (tag == RBD_DIFF_END) {
+    r = read_tag(fd, RBD_DIFF_END, format, &tag, &length);
+    if (r < 0 || tag == RBD_DIFF_END) {
       break;
-    } else {
-      if (format == 2) {
-       char buf[8];
-       r = safe_read_exact(fd, buf, 8);
-       if (r < 0) {
-         return r;
-       }
-       bufferlist bl;
-       bl.append(buf, 8);
-       bufferlist::iterator p = bl.begin();
-       ::decode(length, p);
-      }
-
-      if (tag == RBD_DIFF_FROM_SNAP) {
-       r = utils::read_string(fd, 4096, &from);   // 4k limit to make sure we don't get a garbage string
-       if (r < 0)
-         goto done;
-
-       bool exists; 
-       r = image.snap_exists2(from.c_str(), &exists);
-       if (r < 0)
-         goto done;
-
-       if (!exists) {
-         std::cerr << "start snapshot '" << from
-                   << "' does not exist in the image, aborting" << std::endl;
-         r = -EINVAL;
-         goto done;
-       }
-      }
-      else if (tag == RBD_DIFF_TO_SNAP) {
-       r = utils::read_string(fd, 4096, &to);   // 4k limit to make sure we don't get a garbage string
-       if (r < 0)
-         goto done;
-
-       // verify this snap isn't already present
-       bool exists;
-       r = image.snap_exists2(to.c_str(), &exists);
-       if (r < 0)
-         goto done;
-       
-       if (exists) {
-         std::cerr << "end snapshot '" << to
-                   << "' already exists, aborting" << std::endl;
-         r = -EEXIST;
-         goto done;
-       }
-      } else if (tag == RBD_DIFF_IMAGE_SIZE) {
-       uint64_t end_size;
-       char buf[8];
-       r = safe_read_exact(fd, buf, 8);
-       if (r < 0)
-         goto done;
-       bufferlist bl;
-       bl.append(buf, 8);
-       bufferlist::iterator p = bl.begin();
-       ::decode(end_size, p);
-       uint64_t cur_size;
-       image.size(&cur_size);
-       if (cur_size != end_size) {
-         image.resize(end_size);
-       }
-       if (from_stdin)
-         size = end_size;
-      } else if (tag == RBD_DIFF_WRITE || tag == RBD_DIFF_ZERO) {
-       uint64_t len;
-       char buf[16];
-       r = safe_read_exact(fd, buf, 16);
-       if (r < 0)
-         goto done;
-       bufferlist bl;
-       bl.append(buf, 16);
-       bufferlist::iterator p = bl.begin();
-       ::decode(off, p);
-       ::decode(len, p);
-
-       if (tag == RBD_DIFF_WRITE) {
-         bufferptr bp = buffer::create(len);
-         r = safe_read_exact(fd, bp.c_str(), len);
-         if (r < 0)
-           goto done;
-         bufferlist data;
-         data.append(bp);
-         image.write2(off, len, data, LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
-       } else {
-         image.discard(off, len);
-       }
-      } else {
-       std::cerr << "unrecognized tag byte " << (int)tag
-                 << " in stream; aborting" << std::endl;
-       if (from_stdin) {
-         char buf[4096];
-         uint64_t len = min(length, uint64_t(4096));
-         while (len > 0) {
-           r = safe_read_exact(fd, buf, len);
-           if (r < 0)
-             return r;
-           length -= len;
-           len = min(length, uint64_t(4096));
-         }
-       } else {
-         off64_t offs = lseek64(fd, length, SEEK_CUR);
-         if (offs < 0) {
-           r = -errno;
-           goto done;
-         }
-       }
-      }
     }
-    if (!from_stdin) {
-      // progress through input
-      uint64_t off = lseek64(fd, 0, SEEK_CUR);
-      pc.update_progress(off, size);
-    } else if (size) {
-      // progress through image offsets.  this may jitter if blocks
-      // aren't in order, but it is better than nothing.
-      pc.update_progress(off, size);
+
+    if (tag == RBD_DIFF_FROM_SNAP) {
+      r = do_image_snap_from(&idiffctx);
+    } else if (tag == RBD_DIFF_TO_SNAP) {
+      r = do_image_snap_to(&idiffctx, &tosnap);
+    } else if (tag == RBD_DIFF_IMAGE_SIZE) {
+      r = do_image_resize(&idiffctx);
+    } else if (tag == RBD_DIFF_WRITE || tag == RBD_DIFF_ZERO) {
+      r = do_image_io(&idiffctx, (tag == RBD_DIFF_ZERO));
+    } else {
+      std::cerr << "unrecognized tag byte " << (int)tag << " in stream; skipping"
+                << std::endl;
+      r = skip_tag(fd, length);
     }
   }
-  // take final snap
-  if (to.length()) {
-    r = image.snap_create(to.c_str());
+
+  int temp_r = idiffctx.throttle.wait_for_ret();
+  r = (r < 0) ? r : temp_r; // preserve original error
+  if (r == 0 && tosnap.length()) {
+    idiffctx.image->snap_create(tosnap.c_str());
   }
 
-done:
-  if (r < 0)
-    pc.fail();
-  else
-    pc.finish();
+  idiffctx.finish(r);
   return r;
 }