]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: Use object map for IO operations
authorJason Dillaman <dillaman@redhat.com>
Thu, 9 Oct 2014 08:38:29 +0000 (04:38 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 29 Jan 2015 02:12:52 +0000 (21:12 -0500)
When the RBD object map feature is enabled, IO operations
now check whether or not an object exists before sending an
IO operation to RADOS.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
16 files changed:
src/client/ObjecterWriteback.h
src/librbd/AioRequest.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/LibrbdWriteback.cc
src/librbd/LibrbdWriteback.h
src/librbd/internal.cc
src/librbd/internal.h
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/WritebackHandler.h
src/test/osdc/FakeWriteback.cc
src/test/osdc/FakeWriteback.h
src/test/pybind/test_rbd.py
src/test/run-rbd-tests

index b7e4bd2571285c0f206090764b9c1a2974a5d07b..f6506fa953dd58845c5d2fd1d36808910a905c7c 100644 (file)
@@ -14,10 +14,10 @@ class ObjecterWriteback : public WritebackHandler {
       m_lock(lock) { }
   virtual ~ObjecterWriteback() {}
 
-  virtual void read(const object_t& oid, const object_locator_t& oloc,
-                   uint64_t off, uint64_t len, snapid_t snapid,
-                   bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
-                   Context *onfinish) {
+  virtual void read(const object_t& oid, uint64_t object_no,
+                   const object_locator_t& oloc, uint64_t off, uint64_t len,
+                   snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+                   __u32 trunc_seq, Context *onfinish) {
     m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0,
                           trunc_size, trunc_seq,
                           new C_OnFinisher(new C_Lock(m_lock, onfinish),
index 9c733b162c402ea51573e0b283eb7275848a0ba7..ea898e8b6f08e3eba0bba14b86f86ced87c5c78d 100644 (file)
@@ -195,6 +195,12 @@ namespace librbd {
   int AioRead::send() {
     ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
 
+    // send read request to parent if the object doesn't exist locally
+    if (!m_ictx->object_may_exist(m_object_no)) {
+      complete(-ENOENT);
+      return 0;
+    }
+
     librados::AioCompletion *rados_completion =
       librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
     int r;
index 53a37ae827cc20b28509c8ed67dfccb1328409ae..d17e6fd357e0b5d65093619aff7869c01db059cc 100644 (file)
@@ -486,12 +486,13 @@ namespace librbd {
     return -ENOENT;
   }
 
-  void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
+  void ImageCtx::aio_read_from_cache(object_t o, uint64_t object_no,
+                                    bufferlist *bl, size_t len,
                                     uint64_t off, Context *onfinish) {
     snap_lock.get_read();
     ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0);
     snap_lock.put_read();
-    ObjectExtent extent(o, 0 /* a lie */, off, len, 0);
+    ObjectExtent extent(o, object_no, off, len, 0);
     extent.oloc.pool = data_ctx.get_id();
     extent.buffer_extents.push_back(make_pair(0, len));
     rd->extents.push_back(extent);
@@ -520,14 +521,14 @@ namespace librbd {
     }
   }
 
-  int ImageCtx::read_from_cache(object_t o, bufferlist *bl, size_t len,
-                               uint64_t off) {
+  int ImageCtx::read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+                               size_t len, uint64_t off) {
     int r;
     Mutex mylock("librbd::ImageCtx::read_from_cache");
     Cond cond;
     bool done;
     Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
-    aio_read_from_cache(o, bl, len, off, onfinish);
+    aio_read_from_cache(o, object_no, bl, len, off, onfinish);
     mylock.Lock();
     while (!done)
       cond.Wait(mylock);
@@ -684,12 +685,26 @@ namespace librbd {
     }
   }
 
+  bool ImageCtx::object_may_exist(uint64_t object_no) const
+  {
+    // Fall back to default logic if object map is disabled
+    if ((features & RBD_FEATURE_OBJECT_MAP) == 0 /* || invalid map */) {
+      return true;
+    }
+
+    RWLock::RLocker l(object_map_lock);
+    assert(object_no < object_map.size());
+    return (object_map[object_no] == OBJECT_EXISTS ||
+           object_map[object_no] == OBJECT_PENDING);
+  }
+
   int ImageCtx::refresh_object_map()
   {
     if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
       return 0;
     }
 
+    RWLock::WLocker l(object_map_lock);
     int r = cls_client::object_map_load(&data_ctx, object_map_name(id),
                                        &object_map);
     if (r < 0) {
@@ -720,6 +735,7 @@ namespace librbd {
       return 0;
     }
 
+    RWLock::WLocker l(object_map_lock);
     uint64_t num_objs = Striper::get_num_objects(layout, get_current_size());
     ldout(cct, 20) << "resizing object map: " << num_objs << dendl;
     librados::ObjectWriteOperation op;
@@ -755,6 +771,7 @@ namespace librbd {
       return 0;
     }
 
+    RWLock::WLocker l(object_map_lock);
     assert(start_object_no <= end_object_no);
     assert(/* flagged as invalid || */ end_object_no <= object_map.size());
     if (end_object_no > object_map.size()) {
@@ -786,7 +803,7 @@ namespace librbd {
     int r = data_ctx.operate(object_map_name(id), &op);
     if (r < 0) {
       lderr(cct) << "object map update failed: " << cpp_strerror(r) << dendl;
-      // TODO: disable object map
+      // TODO: remove RBD_FEATURE_EXCLUSIVE_LOCK feature on image
     } else {
       for (uint64_t object_no = start_object_no; object_no < end_object_no;
            ++object_no) {
index e6c741f14c80b16a11600ca6b0d3006432377f3a..1c148d5e90cca7d59a9f901ac4470606dc470eb0 100644 (file)
@@ -160,11 +160,12 @@ namespace librbd {
     uint64_t get_parent_snap_id(librados::snap_t in_snap_id) const;
     int get_parent_overlap(librados::snap_t in_snap_id,
                           uint64_t *overlap) const;
-    void aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
-                            uint64_t off, Context *onfinish);
+    void aio_read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+                            size_t len, uint64_t off, Context *onfinish);
     void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off,
                        Context *onfinish);
-    int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off);
+    int read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+                       size_t len, uint64_t off);
     void user_flushed();
     void flush_cache_aio(Context *onfinish);
     int flush_cache();
@@ -180,6 +181,8 @@ namespace librbd {
     void wait_for_pending_aio();
     void wait_for_pending_copyup();
 
+    /* object map */
+    bool object_may_exist(uint64_t object_no) const;
     int refresh_object_map();
     int resize_object_map(uint8_t default_object_state);
     int update_object_map(uint64_t object_no, uint8_t object_state);
index b1fea4af62f9366acdade307f31d51fb3330a2fa..34d485914d1780c3cb86ce9cfb1aae0dfc398ce9 100644 (file)
@@ -320,6 +320,12 @@ int ImageWatcher::lock() {
   ldout(m_image_ctx.cct, 20) << "acquired exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
 
+  r = m_image_ctx.refresh_object_map();
+  if (r < 0) {
+    unlock();
+    return r;
+  }
+
   bufferlist bl;
   ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
   ::encode(NOTIFY_OP_ACQUIRED_LOCK, bl);
index 854ac9dc3e78becf328652f330d560e9bedadb39..78e2c2d2dbf39f8fcf236054e809efb50564a956 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "common/ceph_context.h"
 #include "common/dout.h"
+#include "common/Finisher.h"
 #include "common/Mutex.h"
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
@@ -85,11 +86,17 @@ namespace librbd {
   };
 
   LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
-    : m_tid(0), m_lock(lock), m_ictx(ictx)
+    : m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx)
   {
+    m_finisher->start();
   }
 
-  void LibrbdWriteback::read(const object_t& oid,
+  LibrbdWriteback::~LibrbdWriteback() {
+    m_finisher->stop();
+    delete m_finisher;
+  }
+
+  void LibrbdWriteback::read(const object_t& oid, uint64_t object_no,
                             const object_locator_t& oloc,
                             uint64_t off, uint64_t len, snapid_t snapid,
                             bufferlist *pbl, uint64_t trunc_size,
@@ -97,6 +104,11 @@ namespace librbd {
   {
     // on completion, take the mutex and then call onfinish.
     Context *req = new C_Request(m_ictx->cct, onfinish, &m_lock);
+    if (!m_ictx->object_may_exist(object_no)) {
+      m_finisher->queue(req, -ENOENT);
+      return;
+    }
+
     librados::AioCompletion *rados_completion =
       librados::Rados::aio_create_completion(req, context_cb, NULL);
     librados::ObjectReadOperation op;
index a8bd9cb2b57a1436982b9b11b08ab6d9fdcfeefe..0212dad9a2cffb8a9c81a3af1a387f14e466a5fa 100644 (file)
@@ -11,6 +11,7 @@
 #include "osd/osd_types.h"
 #include "osdc/WritebackHandler.h"
 
+class Finisher;
 class Mutex;
 
 namespace librbd {
@@ -20,13 +21,13 @@ namespace librbd {
   class LibrbdWriteback : public WritebackHandler {
   public:
     LibrbdWriteback(ImageCtx *ictx, Mutex& lock);
-    virtual ~LibrbdWriteback() {}
+    virtual ~LibrbdWriteback();
 
     // Note that oloc, trunc_size, and trunc_seq are ignored
-    virtual void read(const object_t& oid, const object_locator_t& oloc,
-                     uint64_t off, uint64_t len, snapid_t snapid,
-                     bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
-                     Context *onfinish);
+    virtual void read(const object_t& oid, uint64_t object_no,
+                     const object_locator_t& oloc, uint64_t off, uint64_t len,
+                     snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+                     __u32 trunc_seq, Context *onfinish);
 
     // Determine whether a read to this extent could be affected by a write-triggered copy-on-write
     virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid);
@@ -52,6 +53,7 @@ namespace librbd {
   private:
     void complete_writes(const std::string& oid);
 
+    Finisher *m_finisher;
     ceph_tid_t m_tid;
     Mutex& m_lock;
     librbd::ImageCtx *m_ictx;
index 50b99f26022f7121e42ae0ab4fb0317181821e99..9feaa18f520f26707082fcb29da2cf3bd6188ec4 100644 (file)
@@ -296,6 +296,20 @@ namespace librbd {
     return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
   }
 
+  void rollback_object(ImageCtx *ictx, uint64_t snap_id, const string& oid,
+                      SimpleThrottle& throttle)
+  {
+    Context *req_comp = new C_SimpleThrottle(&throttle);
+    librados::AioCompletion *rados_completion =
+      librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+    librados::ObjectWriteOperation op;
+    op.selfmanaged_snap_rollback(snap_id);
+    ictx->data_ctx.aio_operate(oid, rados_completion, &op);
+    ldout(ictx->cct, 10) << "scheduling selfmanaged_snap_rollback on "
+                         << oid << " to " << snap_id << dendl;
+    rados_completion->release();
+  }
+
   int rollback_image(ImageCtx *ictx, uint64_t snap_id,
                     ProgressContext& prog_ctx)
   {
@@ -312,17 +326,10 @@ namespace librbd {
 
     for (uint64_t i = 0; i < numseg; i++) {
       string oid = ictx->get_object_name(i);
-      Context *req_comp = new C_SimpleThrottle(&throttle);
-      librados::AioCompletion *rados_completion =
-       librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
-      librados::ObjectWriteOperation op;
-      op.selfmanaged_snap_rollback(snap_id);
-      ictx->data_ctx.aio_operate(oid, rados_completion, &op);
-      ldout(cct, 10) << "scheduling selfmanaged_snap_rollback on "
-                    << oid << " to " << snap_id << dendl;
-      rados_completion->release();
+      rollback_object(ictx, snap_id, ictx->get_object_name(i), throttle);
       prog_ctx.update_progress(i * bsize, numseg * bsize);
     }
+    rollback_object(ictx, snap_id, object_map_name(ictx->id), throttle);
 
     r = throttle.wait_for_ret();
     if (r < 0) {
@@ -826,6 +833,8 @@ reprotect_and_return_err:
     ostringstream oss;
     CephContext *cct = (CephContext *)io_ctx.cct();
 
+    ceph_file_layout layout;
+
     id_obj = id_obj_name(imgname);
 
     int r = io_ctx.create(id_obj, true);
@@ -872,6 +881,31 @@ reprotect_and_return_err:
       }
     }
 
+    if ((features & RBD_FEATURE_OBJECT_MAP) != 0) {
+      if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+        lderr(cct) << "cannot use object map without exclusive lock" << dendl;
+        goto err_remove_header;
+      }
+
+      memset(&layout, 0, sizeof(layout));
+      layout.fl_object_size = 1ull << order;
+      if (stripe_unit == 0 || stripe_count == 0) {
+        layout.fl_stripe_unit = layout.fl_object_size;
+        layout.fl_stripe_count = 1;
+      } else {
+        layout.fl_stripe_unit = stripe_unit;
+        layout.fl_stripe_count = stripe_count;
+      }
+
+      librados::ObjectWriteOperation op;
+      cls_client::object_map_resize(&op, Striper::get_num_objects(layout, size),
+                                    OBJECT_NONEXISTENT);
+      r = io_ctx.operate(object_map_name(id), &op);
+      if (r < 0) {
+        goto err_remove_header;
+      }
+    }
+
     ldout(cct, 2) << "done." << dendl;
     return 0;
 
@@ -1532,6 +1566,11 @@ reprotect_and_return_err:
       }
     }
     if (!old_format) {
+      r = io_ctx.remove(object_map_name(id));
+      if (r < 0 && r != -ENOENT) {
+       lderr(cct) << "error removing image object map" << dendl;
+      }
+
       ldout(cct, 2) << "removing id object..." << dendl;
       r = io_ctx.remove(id_obj_name(imgname));
       if (r < 0 && r != -ENOENT) {
@@ -1735,6 +1774,9 @@ reprotect_and_return_err:
                           << dendl;
        return;
       }
+
+      m_ictx->size = m_new_size;
+      m_ictx->resize_object_map(OBJECT_NONEXISTENT);
     }
 
   private:
@@ -1796,6 +1838,10 @@ reprotect_and_return_err:
         return -ERESTART;
       }
 
+      if (!m_ictx->object_may_exist(m_object_no)) {
+       return 1;
+      }
+
       string oid = m_ictx->get_object_name(m_object_no);
       librados::AioCompletion *rados_completion =
        librados::Rados::aio_create_completion(this, NULL, rados_ctx_cb);
@@ -1822,7 +1868,7 @@ reprotect_and_return_err:
     }
 
     virtual void finish(int r) {
-      if (r < 0 || m_delete_offset <= m_new_size) {
+      if (r < 0) {
        m_ctx->complete(r);
        return;
       }
@@ -1834,6 +1880,13 @@ reprotect_and_return_err:
        return;
       }
 
+      m_ictx->update_object_map(m_delete_start, m_num_objects,
+                               OBJECT_NONEXISTENT, OBJECT_PENDING);
+      if (m_delete_offset <= m_new_size) {
+       m_ctx->complete(r);
+       return;
+      }
+
       // discard the weird boundary, if any
       vector<ObjectExtent> extents;
       Striper::file_to_extents(m_ictx->cct, m_ictx->format_string,
@@ -1847,14 +1900,25 @@ reprotect_and_return_err:
        Context *req_comp = new C_ContextCompletion(*completion);
        librados::AioCompletion *rados_completion =
          librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+
+       bool flag_nonexistent = false;
        if (p->offset == 0) {
+         flag_nonexistent = true;
+         m_ictx->update_object_map(p->objectno, p->objectno + 1,
+                                   OBJECT_PENDING, OBJECT_EXISTS);
          m_ictx->data_ctx.aio_remove(p->oid.name, rados_completion);
        } else {
+         m_ictx->update_object_map(p->objectno, OBJECT_EXISTS);
          librados::ObjectWriteOperation op;
          op.truncate(p->offset);
          m_ictx->data_ctx.aio_operate(p->oid.name, rados_completion, &op);
        }
        rados_completion->release();
+
+       if (flag_nonexistent) {
+         m_ictx->update_object_map(p->objectno, p->objectno + 1,
+                                   OBJECT_NONEXISTENT, OBJECT_PENDING);
+       }
       }
       completion->finish_adding_requests();
     }
@@ -1894,6 +1958,9 @@ reprotect_and_return_err:
       ldout(cct, 2) << "trim_image objects " << delete_start << " to "
                    << (num_objects - 1) << dendl;
 
+      ictx->update_object_map(delete_start, num_objects, OBJECT_PENDING,
+                             OBJECT_EXISTS);
+
       AsyncObjectThrottle::ContextFactory context_factory(
         boost::lambda::bind(boost::lambda::new_ptr<AsyncTrimObjectContext>(),
           boost::lambda::_1, ictx, boost::lambda::_2));
@@ -2084,9 +2151,9 @@ reprotect_and_return_err:
     vector<parent_info> snap_parents;
     vector<uint8_t> snap_protection;
     {
+      int r;
       RWLock::WLocker l(ictx->snap_lock);
       {
-        int r;
        RWLock::WLocker l2(ictx->parent_lock);
        ictx->lockers.clear();
        if (ictx->old_format) {
@@ -2215,6 +2282,13 @@ reprotect_and_return_err:
        ictx->snap_exists = false;
       }
 
+      if (ictx->snap_exists) {
+       r = ictx->refresh_object_map();
+       if (r < 0) {
+         return r;
+       }
+      }
+
       ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
     } // release snap_lock
 
@@ -2497,6 +2571,7 @@ reprotect_and_return_err:
       return r;
     }
 
+    ictx->refresh_object_map();
     refresh_parent(ictx);
     return 0;
   }
@@ -3603,6 +3678,11 @@ reprotect_and_return_err:
        bl.append(buf + q->first, q->second);
       }
 
+      r = ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+      if (r < 0) {
+       goto done;
+      }
+
       C_AioWrite *req_comp = new C_AioWrite(cct, c);
       if (ictx->object_cacher) {
        c->add_request();
@@ -3702,9 +3782,15 @@ reprotect_and_return_err:
        object_overlap = ictx->prune_parent_extents(objectx, overlap);
       }
 
+      bool flag_nonexistent = false;
       if (p->offset == 0 && p->length == ictx->layout.fl_object_size) {
        req = new AioRemove(ictx, p->oid.name, p->objectno, objectx, object_overlap,
                            snapc, snap_id, req_comp);
+       if (!req->has_parent()) {
+          ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_PENDING,
+                                 OBJECT_EXISTS);
+         flag_nonexistent = true;
+       }
       } else if (p->offset + p->length == ictx->layout.fl_object_size) {
        req = new AioTruncate(ictx, p->oid.name, p->objectno, p->offset, objectx, object_overlap,
                              snapc, snap_id, req_comp);
@@ -3714,9 +3800,18 @@ reprotect_and_return_err:
                          snapc, snap_id, req_comp);
       }
 
+      if (!flag_nonexistent) {
+       ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+      }
+
       r = req->send();
       if (r < 0)
        goto done;
+
+      if (flag_nonexistent) {
+       ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_NONEXISTENT,
+                               OBJECT_PENDING);
+      }
     }
     r = 0;
   done:
@@ -3800,7 +3895,7 @@ reprotect_and_return_err:
 
          Context *req_comp = new C_RBD_Readahead(ictx, q->oid, q->offset, q->length);
          ictx->readahead.inc_pending();
-         ictx->aio_read_from_cache(q->oid, NULL,
+         ictx->aio_read_from_cache(q->oid, q->objectno, NULL,
                                    q->length, q->offset,
                                    req_comp);
        }
@@ -3874,7 +3969,7 @@ reprotect_and_return_err:
 
        if (ictx->object_cacher) {
          C_CacheRead *cache_comp = new C_CacheRead(req);
-         ictx->aio_read_from_cache(q->oid, &req->data(),
+         ictx->aio_read_from_cache(q->oid, q->objectno, &req->data(),
                                    q->length, q->offset,
                                    cache_comp);
        } else {
index af0bbb5dbc35810024d931650bb5b40df9d7e662..48ddcfcb3e44dbbe3377f9f05d1401fe1c93603c 100644 (file)
@@ -54,6 +54,7 @@ enum {
 };
 
 class Context;
+class SimpleThrottle;
 
 namespace librbd {
 
@@ -166,6 +167,8 @@ namespace librbd {
                   ceph::bufferlist& header);
   int tmap_set(librados::IoCtx& io_ctx, const std::string& imgname);
   int tmap_rm(librados::IoCtx& io_ctx, const std::string& imgname);
+  void rollback_object(ImageCtx *ictx, uint64_t snap_id, const string& oid,
+                       SimpleThrottle& throttle);
   int rollback_image(ImageCtx *ictx, uint64_t snap_id,
                     ProgressContext& prog_ctx);
   void image_info(const ImageCtx *ictx, image_info_t& info, size_t info_size);
index d94bd31a3982bd2daae5c9eb9f85ab2c6aa3f514..ee79da733dd761db4a8577370d2658e94edecfd8 100644 (file)
@@ -563,7 +563,9 @@ void ObjectCacher::perf_stop()
 }
 
 /* private */
-ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, ObjectSet *oset,
+ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
+                                              uint64_t object_no,
+                                              ObjectSet *oset,
                                               object_locator_t &l,
                                               uint64_t truncate_size,
                                               uint64_t truncate_seq)
@@ -574,6 +576,7 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, ObjectSet *oset,
   if ((uint32_t)l.pool < objects.size()) {
     if (objects[l.pool].count(oid)) {
       Object *o = objects[l.pool][oid];
+      o->object_no = object_no;
       o->truncate_size = truncate_size;
       o->truncate_seq = truncate_seq;
       return o;
@@ -583,7 +586,8 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, ObjectSet *oset,
   }
 
   // create it.
-  Object *o = new Object(this, oid, oset, l, truncate_size, truncate_seq);
+  Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
+                        truncate_seq);
   objects[l.pool][oid] = o;
   ob_lru.lru_insert_top(o);
   return o;
@@ -618,10 +622,10 @@ void ObjectCacher::bh_read(BufferHead *bh)
   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
                                            bh->start(), bh->length());
   // go
-  writeback_handler.read(bh->ob->get_oid(), bh->ob->get_oloc(),
-                        bh->start(), bh->length(), bh->ob->get_snap(),
-                        &onfinish->bl, bh->ob->truncate_size, bh->ob->truncate_seq,
-                        onfinish);
+  writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
+                        bh->ob->get_oloc(), bh->start(), bh->length(),
+                        bh->ob->get_snap(), &onfinish->bl,
+                        bh->ob->truncate_size, bh->ob->truncate_seq, onfinish);
 
   ++reads_outstanding;
 }
@@ -1042,7 +1046,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
 
     // get Object cache
     sobject_t soid(ex_it->oid, rd->snap);
-    Object *o = get_object(soid, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
+    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
     touch_ob(o);
 
     // does not exist and no hits?
@@ -1307,7 +1311,8 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
        ++ex_it) {
     // get object cache
     sobject_t soid(ex_it->oid, CEPH_NOSNAP);
-    Object *o = get_object(soid, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
+    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+                          ex_it->truncate_size, oset->truncate_seq);
 
     // map it all into a single bufferhead.
     BufferHead *bh = o->map_write(wr);
index bed12342d07b619457106c720d5ed22fb1fb31a1..f243cf2627d97842e644713cc9902dfdc2b8b6dc 100644 (file)
@@ -170,6 +170,7 @@ class ObjectCacher {
     friend struct ObjectSet;
 
   public:
+    uint64_t object_no;
     ObjectSet *oset;
     xlist<Object*>::item set_item;
     object_locator_t oloc;
@@ -193,11 +194,11 @@ class ObjectCacher {
     Object(const Object& other);
     const Object& operator=(const Object& other);
 
-    Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, object_locator_t& l,
-          uint64_t ts, uint64_t tq) :
+    Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
+          object_locator_t& l, uint64_t ts, uint64_t tq) :
       ref(0),
       oc(_oc),
-      oid(o), oset(os), set_item(this), oloc(l),
+      oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
       truncate_size(ts), truncate_seq(tq),
       complete(false), exists(true),
       last_write_tid(0), last_commit_tid(0),
@@ -218,6 +219,7 @@ class ObjectCacher {
     snapid_t get_snap() { return oid.snap; }
     ObjectSet *get_object_set() { return oset; }
     string get_namespace() { return oloc.nspace; }
+    uint64_t get_object_number() const { return object_no; }
     
     object_locator_t& get_oloc() { return oloc; }
     void set_object_locator(object_locator_t& l) { oloc = l; }
@@ -373,8 +375,9 @@ class ObjectCacher {
     return NULL;
   }
 
-  Object *get_object(sobject_t oid, ObjectSet *oset, object_locator_t &l,
-                    uint64_t truncate_size, uint64_t truncate_seq);
+  Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
+                    object_locator_t &l, uint64_t truncate_size,
+                    uint64_t truncate_seq);
   void close_object(Object *ob);
 
   // bh stats
index 4869837306858521218495afa3b199ce8df92230..caf20959b8a49b271dc7de5eb207a6e5bfae1dfd 100644 (file)
@@ -12,10 +12,10 @@ class WritebackHandler {
   WritebackHandler() {}
   virtual ~WritebackHandler() {}
 
-  virtual void read(const object_t& oid, const object_locator_t& oloc,
-                   uint64_t off, uint64_t len, snapid_t snapid,
-                   bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
-                   Context *onfinish) = 0;
+  virtual void read(const object_t& oid, uint64_t object_no,
+                   const object_locator_t& oloc, uint64_t off, uint64_t len,
+                   snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+                   __u32 trunc_seq, Context *onfinish) = 0;
   /**
    * check if a given extent read result may change due to a write
    *
index 42567cbe21f9c164a7e8052e31536ee95737d35e..d444ff6d0c18ae73212d9de277febf8c400ee82d 100644 (file)
@@ -58,7 +58,7 @@ FakeWriteback::~FakeWriteback()
   delete m_finisher;
 }
 
-void FakeWriteback::read(const object_t& oid,
+void FakeWriteback::read(const object_t& oid, uint64_t object_no,
                         const object_locator_t& oloc,
                         uint64_t off, uint64_t len, snapid_t snapid,
                         bufferlist *pbl, uint64_t trunc_size,
index 2b7fbd679116d025a0718f2c73c4f17b6d3622e5..ef2bb3d1f3b677b34c10fc9bbf9c11b3320f0ead 100644 (file)
@@ -17,10 +17,10 @@ public:
   FakeWriteback(CephContext *cct, Mutex *lock, uint64_t delay_ns);
   virtual ~FakeWriteback();
 
-  virtual void read(const object_t& oid, const object_locator_t& oloc,
-                   uint64_t off, uint64_t len, snapid_t snapid,
-                   bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
-                   Context *onfinish);
+  virtual void read(const object_t& oid, uint64_t object_no,
+                   const object_locator_t& oloc, uint64_t off, uint64_t len,
+                   snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+                   __u32 trunc_seq, Context *onfinish);
 
   virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
                           uint64_t off, uint64_t len,
index 337c874432f9c9e7c39e612898469e63c5643e57..4d3cf9d4cfbbcdb53195ced125114ee97aeb9bf3 100644 (file)
@@ -1029,5 +1029,5 @@ class TestExclusiveLock(object):
             eq(image1.is_exclusive_lock_owner(), False)
             eq(image2.is_exclusive_lock_owner(), True)
             for offset in [0, IMG_SIZE / 2]:
-                read = image2.read(0, 256)
+                read = image2.read(offset, 256)
                 eq(data, read)
index 6a6e1f21939ba8e05c79ec6900a54dc22f83902d..f46fd7bf19af645b2747e817c2b95a4ceeaf0e00 100755 (executable)
@@ -35,7 +35,7 @@ run_cli_tests
 export RBD_CREATE_ARGS="--format 2"
 run_cli_tests
 
-for i in 0 1 5
+for i in 0 1 5 13
 do
     export RBD_FEATURES=$i
     run_api_tests