]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: reimplement striping
authorSage Weil <sage@inktank.com>
Thu, 4 Oct 2012 23:53:27 +0000 (16:53 -0700)
committerSage Weil <sage@inktank.com>
Mon, 15 Oct 2012 22:34:04 +0000 (15:34 -0700)
This replaces most of the existing striping code with use of the Filer
striping helper methods and a more general ceph_file_layout that can
handle more sophisticated striping patterns that the previous uniform
object approach.

This patch is not fully complete; there are a few additional patches that
follow that clean up some of the support functions.  However, most of the
IO path is covered here.

Signed-off-by: Sage Weil <sage@inktank.com>
src/Makefile.am
src/librbd/AioCompletion.cc
src/librbd/AioCompletion.h
src/librbd/AioRequest.cc
src/librbd/AioRequest.h
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/LibrbdWriteback.cc
src/librbd/internal.cc
src/librbd/internal.h

index f725617cd12369de1c48643ea8d4f652ed3d05fd..b58ce20ce42a693680705518941b026409b99454 100644 (file)
@@ -372,6 +372,7 @@ librbd_la_SOURCES = \
        librbd/LibrbdWriteback.cc \
        librbd/WatchCtx.cc \
        osdc/ObjectCacher.cc \
+       osdc/Filer.cc \
        cls/lock/cls_lock_client.cc \
        cls/lock/cls_lock_types.cc \
        cls/lock/cls_lock_ops.cc
index 767b232a4d91a3dd15e3ef654e99b82a05be75b5..de924e614b4ab8544952a7728283b2c04d4a0def 100644 (file)
@@ -31,6 +31,17 @@ namespace librbd {
     assert(pending_count);
     int count = --pending_count;
     if (!count) {
+      if (rval >= 0 && aio_type == AIO_TYPE_READ) {
+       // FIXME: make the destriper write directly into a buffer so
+       // that we avoid shuffling pointers and copying zeros around.
+       bufferlist bl;
+       destriper.assemble_result(bl, true);
+       assert(bl.length() == read_buf_len);
+       bl.copy(0, read_buf_len, read_buf);
+       ldout(cct, 20) << "AioCompletion::complete_request() copied resulting " << bl.length()
+                      << " bytes to " << (void*)read_buf << dendl;
+      }      
+
       complete();
     }
     put_unlock();
@@ -38,13 +49,15 @@ namespace librbd {
 
   void C_AioRead::finish(int r)
   {
-    ldout(m_cct, 10) << "C_AioRead::finish() " << this << dendl;
+    ldout(m_cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl;
     if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation
-      ldout(m_cct, 10) << "ofs=" << m_req->offset()
-                      << " len=" << m_req->length() << dendl;
-      r = handle_sparse_read(m_cct, m_req->data(), m_req->offset(),
-                            m_req->ext_map(), 0, m_req->length(),
-                            m_out_buf);
+      ldout(m_cct, 10) << " got " << m_req->m_ext_map
+                      << " for " << m_req->m_buffer_extents
+                      << " bl " << m_req->data().length() << dendl;
+      m_completion->destriper.add_partial_sparse_result(m_req->data(),
+                                                       m_req->m_ext_map, m_req->m_object_off,
+                                                       m_req->m_buffer_extents);
+      r = m_req->m_object_len;
     }
     m_completion->complete_request(m_cct, r);
   }
index e5cb16deadd94bbea828508eb6c4a956489302b1..6b3c2e675f77359857da7491506639b03e3a063e 100644 (file)
@@ -14,6 +14,8 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
 
+#include "osdc/Filer.h"
+
 namespace librbd {
 
   class AioRead;
@@ -52,11 +54,16 @@ namespace librbd {
     utime_t start_time;
     aio_type_t aio_type;
 
+    Filer::StripedReadResult destriper;
+    char *read_buf;
+    size_t read_buf_len;
+
     AioCompletion() : lock("AioCompletion::lock", true),
                      done(false), rval(0), complete_cb(NULL),
                      complete_arg(NULL), rbd_comp(NULL), pending_count(1),
                      ref(1), released(false), ictx(NULL),
-                     aio_type(AIO_TYPE_NONE) {
+                     aio_type(AIO_TYPE_NONE),
+                     read_buf(NULL), read_buf_len(0) {
     }
     ~AioCompletion() {
     }
@@ -155,8 +162,9 @@ namespace librbd {
 
   class C_AioRead : public Context {
   public:
-    C_AioRead(CephContext *cct, AioCompletion *completion, char *out_buf)
-      : m_cct(cct), m_completion(completion), m_req(NULL), m_out_buf(out_buf) {}
+    C_AioRead(CephContext *cct, AioCompletion *completion)
+      : m_cct(cct), m_completion(completion), m_req(NULL)
+    { }
     virtual ~C_AioRead() {}
     virtual void finish(int r);
     void set_req(AioRead *req) {
@@ -166,7 +174,6 @@ namespace librbd {
     CephContext *m_cct;
     AioCompletion *m_completion;
     AioRead *m_req;
-    char *m_out_buf;
   };
 
   class C_AioWrite : public Context {
index f8a82236927ca26130bc440b64455e6ba4f6abd1..cc55c9d10147c89b84f3f595bfafed61273bd581 100644 (file)
 namespace librbd {
 
   AioRequest::AioRequest() :
-    m_ictx(NULL), m_image_ofs(0), m_block_ofs(0), m_len(0),
+    m_ictx(NULL),
     m_snap_id(CEPH_NOSNAP), m_completion(NULL), m_parent_completion(NULL),
     m_hide_enoent(false) {}
   AioRequest::AioRequest(ImageCtx *ictx, const std::string &oid,
-                        uint64_t image_ofs, size_t len,
+                        uint64_t objectno, uint64_t off, uint64_t len,
                         librados::snap_t snap_id,
                         Context *completion,
                         bool hide_enoent) {
@@ -30,9 +30,9 @@ namespace librbd {
     m_ioctx.dup(ictx->data_ctx);
     m_ioctx.snap_set_read(snap_id);
     m_oid = oid;
-    m_image_ofs = image_ofs;
-    m_block_ofs = get_block_ofs(ictx->order, image_ofs);
-    m_len = len;
+    m_object_no = objectno;
+    m_object_off = off;
+    m_object_len = len;
     m_snap_id = snap_id;
     m_completion = completion;
     m_parent_completion = NULL;
@@ -46,7 +46,7 @@ namespace librbd {
     }
   }
 
-  void AioRequest::read_from_parent(uint64_t image_ofs, size_t len)
+  void AioRequest::read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents)
   {
     ldout(m_ictx->cct, 20) << "read_from_parent this = " << this << dendl;
 
@@ -54,10 +54,12 @@ namespace librbd {
     assert(m_ictx->parent_lock.is_locked());
 
     m_parent_completion = aio_create_completion_internal(this, rbd_req_cb);
-    aio_read(m_ictx->parent, image_ofs, len, m_read_data.c_str(),
+    aio_read(m_ictx->parent, image_extents, m_read_data.c_str(),
             m_parent_completion);
   }
 
+  /** read **/
+
   bool AioRead::should_complete(int r)
   {
     ldout(m_ictx->cct, 20) << "read should_complete: r = " << r << dendl;
@@ -65,17 +67,32 @@ namespace librbd {
     if (!m_tried_parent && r == -ENOENT) {
       Mutex::Locker l(m_ictx->snap_lock);
       Mutex::Locker l2(m_ictx->parent_lock);
-      size_t len = m_ictx->parent_io_len(m_image_ofs, m_len, m_snap_id);
-      if (len) {
+
+      // calculate reverse mapping onto the image
+      vector<pair<uint64_t,uint64_t> > image_extents;
+      Filer::extent_to_file(m_ictx->cct, &m_ictx->layout,
+                           m_object_no, m_object_off, m_object_len,
+                           image_extents);
+
+      uint64_t image_overlap = 0;
+      r = m_ictx->get_parent_overlap(m_snap_id, &image_overlap);
+      if (r < 0) {
+       assert(0 == "FIXME");
+      }
+      uint64_t object_overlap = m_ictx->prune_parent_extents(image_extents, image_overlap);
+      if (object_overlap) {
        m_tried_parent = true;
-       // zero the buffer so we have the full requested length result,
-       // even if we actually read less due to overlap
-       ceph::buffer::ptr bp(len);
-       bp.zero();
+
+       ceph::buffer::ptr bp(object_overlap);
        m_read_data.append(bp);
-       // fill in single extent for sparse read callback
-       m_ext_map[m_block_ofs] = len;
-       read_from_parent(m_image_ofs, len);
+       if (object_overlap < m_object_len) {
+         ceph::buffer::ptr bp2(m_object_len - object_overlap);
+         bp2.zero();
+         m_read_data.append(bp2);
+       }
+
+       m_ext_map[m_object_off] = m_object_len;  // the parent IO will read this extent
+       read_from_parent(image_extents);
        return false;
       }
     }
@@ -89,26 +106,32 @@ namespace librbd {
     int r;
     if (m_sparse) {
       r = m_ioctx.aio_sparse_read(m_oid, rados_completion, &m_ext_map,
-                                 &m_read_data, m_len, m_block_ofs);
+                                 &m_read_data, m_object_len, m_object_off);
     } else {
       r = m_ioctx.aio_read(m_oid, rados_completion, &m_read_data,
-                          m_len, m_block_ofs);
+                          m_object_len, m_object_off);
     }
     rados_completion->release();
     return r;
   }
 
-  AbstractWrite::AbstractWrite() :
-    m_state(LIBRBD_AIO_WRITE_FINAL), m_has_parent(false) {}
+  /** read **/
+
+  AbstractWrite::AbstractWrite() : m_state(LIBRBD_AIO_WRITE_FINAL) {}
   AbstractWrite::AbstractWrite(ImageCtx *ictx, const std::string &oid,
-                              uint64_t image_ofs, size_t len,
-                              librados::snap_t snap_id, Context *completion,
-                              bool has_parent, const ::SnapContext &snapc,
+                              uint64_t object_no, uint64_t object_off, uint64_t len,
+                              vector<pair<uint64_t,uint64_t> >& objectx,
+                              uint64_t object_overlap,
+                              const ::SnapContext &snapc, librados::snap_t snap_id,
+                              Context *completion,
                               bool hide_enoent)
-    : AioRequest(ictx, oid, image_ofs, len, snap_id, completion, hide_enoent)
+    : AioRequest(ictx, oid, object_no, object_off, len, snap_id, completion, hide_enoent)
   {
     m_state = LIBRBD_AIO_WRITE_FINAL;
-    m_has_parent = has_parent;
+
+    m_object_image_extents = objectx;
+    m_parent_overlap = object_overlap;
+
     // TODO: find a way to make this less stupid
     std::vector<librados::snap_t> snaps;
     for (std::vector<snapid_t>::const_iterator it = snapc.snaps.begin();
@@ -120,11 +143,11 @@ namespace librbd {
 
   void AbstractWrite::guard_write()
   {
-    if (m_has_parent) {
+    if (has_parent()) {
       m_state = LIBRBD_AIO_WRITE_CHECK_EXISTS;
       m_read.stat(NULL, NULL, NULL);
     }
-    ldout(m_ictx->cct, 20) << __func__ << " m_has_parent = " << m_has_parent
+    ldout(m_ictx->cct, 20) << __func__ << " has_parent = " << has_parent()
                           << " m_state = " << m_state << " check exists = "
                           << LIBRBD_AIO_WRITE_CHECK_EXISTS << dendl;
       
@@ -147,19 +170,16 @@ namespace librbd {
       if (r == -ENOENT) {
        Mutex::Locker l(m_ictx->snap_lock);
        Mutex::Locker l2(m_ictx->parent_lock);
+
        // copyup the entire object up to the overlap point
-       uint64_t block_begin = m_image_ofs - m_block_ofs;
-       size_t len = m_ictx->parent_io_len(block_begin,
-                                          get_block_size(m_ictx->order),
-                                          m_snap_id);
-       if (len) {
-         ldout(m_ictx->cct, 20) << "reading from parent" << dendl;
-         m_state = LIBRBD_AIO_WRITE_COPYUP;
-         ceph::buffer::ptr bp(len);
-         m_read_data.append(bp);
-         read_from_parent(block_begin, len);
-         break;
-       }
+       ldout(m_ictx->cct, 20) << "reading from parent " << m_object_image_extents << dendl;
+       assert(m_object_image_extents.size());
+
+       m_state = LIBRBD_AIO_WRITE_COPYUP;
+       ceph::buffer::ptr bp(m_parent_overlap);
+       m_read_data.append(bp);
+       read_from_parent(m_object_image_extents);
+       break;
       }
       ldout(m_ictx->cct, 20) << "no need to read from parent" << dendl;
       m_state = LIBRBD_AIO_WRITE_FINAL;
index 79f0c5540b3bb505b64c72aa82d52d24d878a13d..ce1ad8db877451c56a6d4f3b8370aac72966c849 100644 (file)
@@ -26,21 +26,12 @@ namespace librbd {
   {
   public:
     AioRequest();
-    AioRequest(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
-              size_t len, librados::snap_t snap_id, Context *completion,
+    AioRequest(ImageCtx *ictx, const std::string &oid,
+              uint64_t objectno, uint64_t off, uint64_t len,
+              librados::snap_t snap_id, Context *completion,
               bool hide_enoent);
     virtual ~AioRequest();
 
-    uint64_t offset()
-    {
-      return m_block_ofs;
-    }
-
-    size_t length()
-    {
-      return m_len;
-    }
-
     void complete(int r)
     {
       if (should_complete(r)) {
@@ -55,14 +46,12 @@ namespace librbd {
     virtual int send() = 0;
 
   protected:
-    void read_from_parent(uint64_t image_ofs, size_t len);
+    void read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents);
 
     ImageCtx *m_ictx;
     librados::IoCtx m_ioctx;
     std::string m_oid;
-    uint64_t m_image_ofs;
-    uint64_t m_block_ofs;
-    size_t m_len;
+    uint64_t m_object_no, m_object_off, m_object_len;
     librados::snap_t m_snap_id;
     Context *m_completion;
     AioCompletion *m_parent_completion;
@@ -72,10 +61,13 @@ namespace librbd {
 
   class AioRead : public AioRequest {
   public:
-    AioRead(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
-           size_t len, librados::snap_t snap_id, bool sparse,
+    AioRead(ImageCtx *ictx, const std::string &oid,
+           uint64_t objectno, uint64_t offset, uint64_t len,
+           vector<pair<uint64_t,uint64_t> >& be,
+           librados::snap_t snap_id, bool sparse,
            Context *completion)
-      : AioRequest(ictx, oid, image_ofs, len, snap_id, completion, false),
+      : AioRequest(ictx, oid, objectno, offset, len, snap_id, completion, false),
+       m_buffer_extents(be),
        m_tried_parent(false), m_sparse(sparse) {
       m_ioctx.snap_set_read(m_snap_id);
     }
@@ -86,12 +78,12 @@ namespace librbd {
     ceph::bufferlist &data() {
       return m_read_data;
     }
-    std::map<uint64_t, uint64_t> &ext_map() {
-      return m_ext_map;
-    }
+    std::map<uint64_t, uint64_t> m_ext_map;
+
+    friend class C_AioRead;
 
   private:
-    std::map<uint64_t, uint64_t> m_ext_map;
+    vector<pair<uint64_t,uint64_t> > m_buffer_extents;
     bool m_tried_parent;
     bool m_sparse;
   };
@@ -99,14 +91,22 @@ namespace librbd {
   class AbstractWrite : public AioRequest {
   public:
     AbstractWrite();
-    AbstractWrite(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
-                 size_t len, librados::snap_t snap_id, Context *completion,
-                 bool has_parent, const ::SnapContext &snapc, bool hide_enoent);
+    AbstractWrite(ImageCtx *ictx, const std::string &oid,
+                 uint64_t object_no, uint64_t object_off, uint64_t len,
+                 vector<pair<uint64_t,uint64_t> >& objectx, uint64_t object_overlap,
+                 const ::SnapContext &snapc,
+                 librados::snap_t snap_id,
+                 Context *completion,
+                 bool hide_enoent);
     virtual ~AbstractWrite() {}
     virtual bool should_complete(int r);
     virtual int send();
     void guard_write();
 
+    bool has_parent() const {
+      return !m_object_image_extents.empty();
+    }
+
   private:
     /**
      * Writes go through the following state machine to
@@ -133,7 +133,8 @@ namespace librbd {
     virtual void add_copyup_ops() = 0;
 
     write_state_d m_state;
-    bool m_has_parent;
+    vector<pair<uint64_t,uint64_t> > m_object_image_extents;
+    uint64_t m_parent_overlap;
     librados::ObjectReadOperation m_read;
     librados::ObjectWriteOperation m_write;
     librados::ObjectWriteOperation m_copyup;
@@ -144,20 +145,26 @@ namespace librbd {
 
   class AioWrite : public AbstractWrite {
   public:
-    AioWrite(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
+    AioWrite(ImageCtx *ictx, const std::string &oid,
+            uint64_t object_no, uint64_t object_off,
+            vector<pair<uint64_t,uint64_t> >& objectx, uint64_t object_overlap,
             const ceph::bufferlist &data, const ::SnapContext &snapc,
-            librados::snap_t snap_id, bool has_parent, Context *completion)
-      : AbstractWrite(ictx, oid, image_ofs, data.length(), snap_id, completion,
-                     has_parent, snapc, false),
+            librados::snap_t snap_id,
+            Context *completion)
+      : AbstractWrite(ictx, oid,
+                     object_no, object_off, data.length(),
+                     objectx, object_overlap,
+                     snapc, snap_id,
+                     completion, false),
        m_write_data(data) {
       guard_write();
-      m_write.write(m_block_ofs, data);
+      m_write.write(m_object_off, data);
     }
     virtual ~AioWrite() {}
 
   protected:
     virtual void add_copyup_ops() {
-      m_copyup.write(m_block_ofs, m_write_data);
+      m_copyup.write(m_object_off, m_write_data);
     }
 
   private:
@@ -166,12 +173,17 @@ namespace librbd {
 
   class AioRemove : public AbstractWrite {
   public:
-    AioRemove(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
+    AioRemove(ImageCtx *ictx, const std::string &oid,
+             uint64_t object_no,
+             vector<pair<uint64_t,uint64_t> >& objectx, uint64_t object_overlap,
              const ::SnapContext &snapc, librados::snap_t snap_id,
-             bool has_parent, Context *completion)
-      : AbstractWrite(ictx, oid, image_ofs, 0, snap_id, completion,
-                     has_parent, snapc, true) {
-      if (has_parent)
+             Context *completion)
+      : AbstractWrite(ictx, oid,
+                     object_no, 0, 0,
+                     objectx, object_overlap,
+                     snapc, snap_id, completion,
+                     true) {
+      if (has_parent())
        m_write.truncate(0);
       else
        m_write.remove();
@@ -187,37 +199,47 @@ namespace librbd {
 
   class AioTruncate : public AbstractWrite {
   public:
-    AioTruncate(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
+    AioTruncate(ImageCtx *ictx, const std::string &oid,
+               uint64_t object_no, uint64_t object_off,
+               vector<pair<uint64_t,uint64_t> >& objectx, uint64_t object_overlap,
                const ::SnapContext &snapc, librados::snap_t snap_id,
-               bool has_parent, Context *completion)
-      : AbstractWrite(ictx, oid, image_ofs, 0, snap_id, completion,
-                     has_parent, snapc, true) {
+               Context *completion)
+      : AbstractWrite(ictx, oid,
+                     object_no, object_off, 0,
+                     objectx, object_overlap,
+                     snapc, snap_id, completion,
+                     true) {
       guard_write();
-      m_write.truncate(m_block_ofs);
+      m_write.truncate(object_off);
     }
     virtual ~AioTruncate() {}
 
   protected:
     virtual void add_copyup_ops() {
-      m_copyup.truncate(m_block_ofs);
+      m_copyup.truncate(m_object_off);
     }
   };
 
   class AioZero : public AbstractWrite {
   public:
-    AioZero(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
-           size_t len, const ::SnapContext &snapc, librados::snap_t snap_id,
-           bool has_parent, Context *completion)
-      : AbstractWrite(ictx, oid, image_ofs, len, snap_id, completion,
-                     has_parent, snapc, true) {
+    AioZero(ImageCtx *ictx, const std::string &oid,
+           uint64_t object_no, uint64_t object_off, uint64_t object_len,
+           vector<pair<uint64_t,uint64_t> >& objectx, uint64_t object_overlap,
+           const ::SnapContext &snapc, librados::snap_t snap_id,
+           Context *completion)
+      : AbstractWrite(ictx, oid,
+                     object_no, object_off, object_len,
+                     objectx, object_overlap,
+                     snapc, snap_id, completion,
+                     true) {
       guard_write();
-      m_write.zero(m_block_ofs, len);
+      m_write.zero(object_off, object_len);
     }
     virtual ~AioZero() {}
 
   protected:
     virtual void add_copyup_ops() {
-      m_copyup.zero(m_block_ofs, m_len);
+      m_copyup.zero(m_object_off, m_object_len);
     }
   };
 
index ed17165adc071316d91e6afae92934ee1663e330..50845524e6bbd00b02f909a6d2bfa6c534eaf1b8 100644 (file)
@@ -132,22 +132,30 @@ namespace librbd {
                   << cpp_strerror(r) << dendl;
        return r;
       }
+
+      init_layout();
     } else {
       header_oid = old_header_name(name);
     }
-
+    return 0;
+  }
+  
+  void ImageCtx::init_layout()
+  {
     if (stripe_unit == 0 || stripe_count == 0) {
       stripe_unit = 1ull << order;
       stripe_count = 1;
     }
 
-    // initialize layout
     memset(&layout, 0, sizeof(layout));
     layout.fl_stripe_unit = stripe_unit;
     layout.fl_stripe_count = stripe_count;
     layout.fl_object_size = 1ull << order;
     layout.fl_pg_pool = data_ctx.get_id();  // FIXME: pool id overflow?
-    return 0;
+
+    ldout(cct, 10) << "init_layout stripe_unit " << stripe_unit
+                  << " stripe_count " << stripe_count
+                  << " object_size " << layout.fl_object_size << dendl;
   }
 
   void ImageCtx::perf_start(string name) {
@@ -534,4 +542,26 @@ namespace librbd {
                   << parent_len << dendl;
     return parent_len;
   }
+
+  uint64_t ImageCtx::prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
+                                         uint64_t overlap)
+  {
+    // drop extents completely beyond the overlap
+    while (!objectx.empty() && objectx.back().first >= overlap)
+      objectx.pop_back();
+
+    // trim final overlapping extent
+    if (!objectx.empty() && objectx.back().first + objectx.back().second > overlap)
+      objectx.back().second = overlap - objectx.back().first;
+
+    uint64_t len = 0;
+    for (vector<pair<uint64_t,uint64_t> >::iterator p = objectx.begin();
+        p != objectx.end();
+        ++p)
+      len += p->second;
+    ldout(cct, 10) << "prune_parent_extents image overlap " << overlap
+                  << ", object overlap " << len
+                  << " from image extents " << objectx << dendl;
+    return len;
+ }
 }
index 616da8ff6f7b4eae1453ec8a527f30adcd1e3b17..0e2f4b3780f2ef64b5b183b352e0192620b95af3 100644 (file)
@@ -93,6 +93,7 @@ namespace librbd {
              const char *snap, IoCtx& p);
     ~ImageCtx();
     int init();
+    void init_layout();
     void perf_start(std::string name);
     void perf_stop();
     int snap_set(std::string in_snap_name);
@@ -126,6 +127,9 @@ namespace librbd {
     void unregister_watch();
     size_t parent_io_len(uint64_t offset, size_t length,
                         librados::snap_t in_snap_id);
+    uint64_t prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
+                                 uint64_t overlap);
+
   };
 }
 
index b420441abcf96b2e799978e8718dfd2be653bb63..9069da153a6c8b8823aed71538274e2158eb8ca3 100644 (file)
@@ -75,10 +75,12 @@ namespace librbd {
   {
     C_Request *req_comp = new C_Request(m_ictx->cct, onfinish, &m_lock);
     C_Read *read_comp = new C_Read(req_comp, pbl);
-    uint64_t total_off = offset_of_object(oid.name, m_ictx->object_prefix,
-                                         m_ictx->order) + off;
-    AioRead *req = new AioRead(m_ictx, oid.name, total_off, len, snapid.val,
-                              false, read_comp);
+    uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
+    vector<pair<uint64_t,uint64_t> > ex(1);
+    ex[0] = make_pair(off, len);
+    AioRead *req = new AioRead(m_ictx, oid.name,
+                              object_no, off, len, ex,
+                              snapid, false, read_comp);
     read_comp->set_req(req);
     req->send();
     return ++m_tid;
@@ -95,18 +97,25 @@ namespace librbd {
     m_ictx->snap_lock.Lock();
     librados::snap_t snap_id = m_ictx->snap_id;
     m_ictx->parent_lock.Lock();
-    int64_t parent_pool_id = m_ictx->get_parent_pool_id(snap_id);
     uint64_t overlap = 0;
     m_ictx->get_parent_overlap(snap_id, &overlap);
     m_ictx->parent_lock.Unlock();
     m_ictx->snap_lock.Unlock();
 
-    uint64_t total_off = offset_of_object(oid.name, m_ictx->object_prefix,
-                                         m_ictx->order) + off;
-    bool parent_exists = has_parent(parent_pool_id, total_off - off, overlap);
+    uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
+    
+    // reverse map this object extent onto the parent
+    vector<pair<uint64_t,uint64_t> > objectx;
+    Filer::extent_to_file(m_ictx->cct, &m_ictx->layout,
+                         object_no, 0, m_ictx->layout.fl_object_size,
+                         objectx);
+    uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
+
     C_Request *req_comp = new C_Request(m_ictx->cct, oncommit, &m_lock);
-    AioWrite *req = new AioWrite(m_ictx, oid.name, total_off, bl, snapc,
-                                snap_id, parent_exists, req_comp);
+    AioWrite *req = new AioWrite(m_ictx, oid.name,
+                                object_no, off, objectx, object_overlap,
+                                bl, snapc, snap_id,
+                                req_comp);
     req->send();
     return ++m_tid;
   }
index 1629472957f1bf7d095fcbc46b409f713e0a51f5..cb88239ccfe5dc7078d9a46788892301a072b938 100644 (file)
@@ -133,14 +133,21 @@ namespace librbd {
     return oss.str();
   }
 
-  uint64_t offset_of_object(const string &oid, const string &object_prefix,
-                           uint8_t order)
+  uint64_t oid_to_object_no(const string& oid, const string& object_prefix)
   {
     istringstream iss(oid);
     // skip object prefix and separator
     iss.ignore(object_prefix.length() + 1);
-    uint64_t num, offset;
+    uint64_t num;
     iss >> std::hex >> num;
+    return num;
+  }
+
+  uint64_t offset_of_object(const string &oid, const string &object_prefix,
+                           uint8_t order)
+  {
+    uint64_t num, offset;
+    num = oid_to_object_no(oid, object_prefix);
     offset = num * (1ULL << order);
     return offset;
   }
@@ -1546,6 +1553,7 @@ reprotect_and_return_err:
          ictx->order = ictx->header.options.order;
          ictx->size = ictx->header.image_size;
          ictx->object_prefix = ictx->header.block_name;
+         ictx->init_layout();
        } else {
          do {
            uint64_t incompatible_features;
@@ -2360,58 +2368,65 @@ reprotect_and_return_err:
     if (r < 0)
       return r;
 
-    size_t total_write = 0;
-    uint64_t start_block = get_block_num(ictx->order, off);
-    uint64_t end_block = get_block_num(ictx->order, off + len - 1);
-    uint64_t block_size = get_block_size(ictx->order);
+    r = check_io(ictx, off, len);
+    if (r < 0)
+      return r;
+
     ictx->snap_lock.Lock();
     snapid_t snap_id = ictx->snap_id;
     ::SnapContext snapc = ictx->snapc;
     ictx->parent_lock.Lock();
-    int64_t parent_pool_id = ictx->get_parent_pool_id(ictx->snap_id);
     uint64_t overlap = 0;
     ictx->get_parent_overlap(ictx->snap_id, &overlap);
     ictx->parent_lock.Unlock();
     ictx->snap_lock.Unlock();
-    uint64_t left = len;
-
-    r = check_io(ictx, off, len);
-    if (r < 0)
-      return r;
 
     if (snap_id != CEPH_NOSNAP)
       return -EROFS;
 
+    ldout(cct, 20) << "  parent overlap " << overlap << dendl;
+
+    // map
+    vector<ObjectExtent> extents;
+    Filer::file_to_extents(ictx->cct, ictx->format_string, &ictx->layout, off, len, extents);
+
+    size_t total_write = 0;
+
     c->get();
     c->init_time(ictx, AIO_TYPE_WRITE);
-    for (uint64_t i = start_block; i <= end_block; i++) {
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      ldout(cct, 20) << "oid = '" << oid << "' i = " << i << dendl;
-      uint64_t total_off = off + total_write;
-      uint64_t block_ofs = get_block_ofs(ictx->order, total_off);
-      uint64_t write_len = min(block_size - block_ofs, left);
+    for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
+      ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
+                    << " from " << p->buffer_extents << dendl;
 
+      // assemble extent
       bufferlist bl;
-      bl.append(buf + total_write, write_len);
+      for (vector<pair<uint64_t,uint64_t> >::iterator q = p->buffer_extents.begin();
+          q != p->buffer_extents.end();
+          ++q) {
+       bl.append(buf + q->first, q->second);
+      }
+
       if (ictx->object_cacher) {
        // may block
-       ictx->write_to_cache(oid, bl, write_len, block_ofs);
+       ictx->write_to_cache(p->oid, bl, p->length, p->offset);
       } else {
+       // reverse map this object extent onto the parent
+       vector<pair<uint64_t,uint64_t> > objectx;
+       Filer::extent_to_file(ictx->cct, &ictx->layout,
+                             p->objectno, 0, ictx->layout.fl_object_size,
+                             objectx);
+       uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap);
+
        C_AioWrite *req_comp = new C_AioWrite(cct, c);
-       bool parent_exists = has_parent(parent_pool_id, total_off - block_ofs, overlap);
-       ldout(ictx->cct, 20) << "has_parent(pool=" << parent_pool_id
-                            << ", off=" << total_off
-                            << ", overlap=" << overlap << ") = "
-                            << parent_exists << dendl;
-       AioWrite *req = new AioWrite(ictx, oid, total_off, bl, snapc, snap_id,
-                                    parent_exists, req_comp);
+       AioWrite *req = new AioWrite(ictx, p->oid.name, p->objectno, p->offset,
+                                    objectx, object_overlap,
+                                    bl, snapc, snap_id, req_comp);
        c->add_request();
        r = req->send();
        if (r < 0)
          goto done;
       }
-      total_write += write_len;
-      left -= write_len;
+      total_write += bl.length();
     }
   done:
     c->finish_adding_requests();
@@ -2437,69 +2452,63 @@ reprotect_and_return_err:
     if (r < 0)
       return r;
 
+    r = check_io(ictx, off, len);
+    if (r < 0)
+      return r;
+
     // TODO: check for snap
-    size_t total_write = 0;
-    uint64_t start_block = get_block_num(ictx->order, off);
-    uint64_t end_block = get_block_num(ictx->order, off + len - 1);
-    uint64_t block_size = get_block_size(ictx->order);
     ictx->snap_lock.Lock();
     snapid_t snap_id = ictx->snap_id;
     ::SnapContext snapc = ictx->snapc;
     ictx->parent_lock.Lock();
-    int64_t parent_pool_id = ictx->get_parent_pool_id(ictx->snap_id);
     uint64_t overlap = 0;
     ictx->get_parent_overlap(ictx->snap_id, &overlap);
     ictx->parent_lock.Unlock();
     ictx->snap_lock.Unlock();
-    uint64_t left = len;
 
-    r = check_io(ictx, off, len);
-    if (r < 0)
-      return r;
-
-    vector<ObjectExtent> v;
-    if (ictx->object_cacher)
-      v.reserve(end_block - start_block + 1);
+    // map
+    vector<ObjectExtent> extents;
+    Filer::file_to_extents(ictx->cct, ictx->format_string, &ictx->layout, off, len, extents);
 
     c->get();
     c->init_time(ictx, AIO_TYPE_DISCARD);
-    for (uint64_t i = start_block; i <= end_block; i++) {
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      uint64_t total_off = off + total_write;
-      uint64_t block_ofs = get_block_ofs(ictx->order, total_off);;
-      uint64_t write_len = min(block_size - block_ofs, left);
-
-      if (ictx->object_cacher) {
-       v.push_back(ObjectExtent(oid, 0, block_ofs, write_len));
-       v.back().oloc.pool = ictx->data_ctx.get_id();
-      }
-
+    for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
+      ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
+                    << " from " << p->buffer_extents << dendl;
       C_AioWrite *req_comp = new C_AioWrite(cct, c);
       AbstractWrite *req;
       c->add_request();
 
-      bool parent_exists = has_parent(parent_pool_id, total_off - block_ofs, overlap);
-      if (block_ofs == 0 && write_len == block_size) {
-       req = new AioRemove(ictx, oid, total_off, snapc, snap_id,
-                           parent_exists, req_comp);
-      } else if (block_ofs + write_len == block_size) {
-       req = new AioTruncate(ictx, oid, total_off, snapc, snap_id,
-                             parent_exists, req_comp);
+      // reverse map this object extent onto the parent
+      vector<pair<uint64_t,uint64_t> > objectx;
+      uint64_t object_overlap = 0;
+      if (off < overlap) {   // we might overlap...
+       Filer::extent_to_file(ictx->cct, &ictx->layout,
+                             p->objectno, 0, ictx->layout.fl_object_size,
+                             objectx);
+       object_overlap = ictx->prune_parent_extents(objectx, overlap);
+      }
+
+      if (p->offset == 0 && p->length == ictx->layout.fl_object_size) {
+       req = new AioRemove(ictx, p->oid.name, p->objectno, objectx, object_overlap,
+                           snapc, snap_id, req_comp);
+      } else if (p->offset + p->length == ictx->layout.fl_object_size) {
+       req = new AioTruncate(ictx, p->oid.name, p->objectno, p->offset, objectx, object_overlap,
+                             snapc, snap_id, req_comp);
       } else {
-       req = new AioZero(ictx, oid, total_off, write_len, snapc, snap_id,
-                         parent_exists, req_comp);
+       req = new AioZero(ictx, p->oid.name, p->objectno, p->offset, p->length,
+                         objectx, object_overlap,
+                         snapc, snap_id, req_comp);
       }
 
       r = req->send();
       if (r < 0)
        goto done;
-      total_write += write_len;
-      left -= write_len;
     }
     r = 0;
   done:
     if (ictx->object_cacher)
-      ictx->object_cacher->discard_set(ictx->object_set, v);
+      ictx->object_cacher->discard_set(ictx->object_set, extents);
 
     c->finish_adding_requests();
     c->put();
@@ -2522,50 +2531,71 @@ reprotect_and_return_err:
               char *buf,
               AioCompletion *c)
   {
-    ldout(ictx->cct, 20) << "aio_read " << ictx << " off = " << off << " len = "
-                        << len << dendl;
+    vector<pair<uint64_t,uint64_t> > image_extents(1);
+    image_extents[0] = make_pair(off, len);
+    return aio_read(ictx, image_extents, buf, c);
+  }
 
-    int r = ictx_check(ictx);
-    if (r < 0)
-      return r;
+  int aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
+              char *buf,
+              AioCompletion *c)
+  {
+    ldout(ictx->cct, 20) << "aio_read " << ictx << " " << image_extents << dendl;
 
-    r = check_io(ictx, off, len);
+    int r = ictx_check(ictx);
     if (r < 0)
       return r;
 
-    int64_t ret;
-    int total_read = 0;
-    uint64_t start_block = get_block_num(ictx->order, off);
-    uint64_t end_block = get_block_num(ictx->order, off + len - 1);
-    uint64_t block_size = get_block_size(ictx->order);
     ictx->snap_lock.Lock();
     snap_t snap_id = ictx->snap_id;
     ictx->snap_lock.Unlock();
-    uint64_t left = len;
+
+    // map
+    vector<ObjectExtent> extents;
+
+    uint64_t buffer_ofs = 0;
+    for (vector<pair<uint64_t,uint64_t> >::const_iterator p = image_extents.begin();
+        p != image_extents.end();
+        ++p) {
+      r = check_io(ictx, p->first, p->second);
+      if (r < 0)
+       return r;
+      
+      Filer::file_to_extents(ictx->cct, ictx->format_string, &ictx->layout,
+                            p->first, p->second, extents, buffer_ofs);
+      buffer_ofs += p->second;
+    }
+
+    int64_t ret;
+
+    c->read_buf = buf;
+    c->read_buf_len = buffer_ofs;
 
     c->get();
     c->init_time(ictx, AIO_TYPE_READ);
-    for (uint64_t i = start_block; i <= end_block; i++) {
-      string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
-      uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
-      uint64_t read_len = min(block_size - block_ofs, left);
-
-      C_AioRead *req_comp = new C_AioRead(ictx->cct, c, buf + total_read);
-      AioRead *req = new AioRead(ictx, oid, off + total_read,
-                                read_len, snap_id, true, req_comp);
+    for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
+      ldout(ictx->cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
+                          << " from " << p->buffer_extents << dendl;
+
+      C_AioRead *req_comp = new C_AioRead(ictx->cct, c);
+      AioRead *req = new AioRead(ictx, p->oid.name, 
+                                p->objectno, p->offset, p->length,
+                                p->buffer_extents,
+                                snap_id, true, req_comp);
       req_comp->set_req(req);
       c->add_request();
 
       if (ictx->object_cacher) {
-       req->ext_map()[block_ofs] = read_len;
        // cache has already handled possible reading from parent, so
        // this AioRead is just used to pass data to the
        // AioCompletion. The AioRead isn't being used as a
        // completion, so wrap the completion in a C_CacheRead to
        // delete it
        C_CacheRead *cache_comp = new C_CacheRead(req_comp, req);
-       ictx->aio_read_from_cache(oid, &req->data(),
-                                 read_len, block_ofs, cache_comp);
+       req->m_ext_map[p->offset] = p->length;
+       ictx->aio_read_from_cache(p->oid, &req->data(),
+                                 p->length, p->offset,
+                                 cache_comp);
       } else {
        r = req->send();
        if (r < 0 && r == -ENOENT)
@@ -2575,17 +2605,14 @@ reprotect_and_return_err:
          goto done;
        }
       }
-
-      total_read += read_len;
-      left -= read_len;
     }
-    ret = total_read;
+    ret = buffer_ofs;
   done:
     c->finish_adding_requests();
     c->put();
 
     ictx->perfcounter->inc(l_librbd_aio_rd);
-    ictx->perfcounter->inc(l_librbd_aio_rd_bytes, len);
+    ictx->perfcounter->inc(l_librbd_aio_rd_bytes, buffer_ofs);
 
     return ret;
   }
index c26c5086fdf91247edee125895dbb0774a8668c2..ab689ff1837666ba4d7fdd30601f3603405effbf 100644 (file)
@@ -154,6 +154,7 @@ namespace librbd {
   void image_info(const ImageCtx *ictx, image_info_t& info, size_t info_size);
   std::string get_block_oid(const std::string &object_prefix, uint64_t num,
                            bool old_format);
+  uint64_t oid_to_object_no(const string& oid, const string& object_prefix);
   uint64_t offset_of_object(const string &oid, const string &object_prefix,
                            uint8_t order);
   uint64_t get_max_block(uint64_t size, uint8_t obj_order);
@@ -176,6 +177,9 @@ namespace librbd {
   int aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c);
   int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
                char *buf, AioCompletion *c);
+  int aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
+              char *buf,
+              AioCompletion *c);
   int flush(ImageCtx *ictx);
   int _flush(ImageCtx *ictx);