]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: refactor existing object map code to its own class
authorJason Dillaman <dillaman@redhat.com>
Tue, 27 Jan 2015 03:12:24 +0000 (22:12 -0500)
committerJason Dillaman <dillaman@redhat.com>
Thu, 29 Jan 2015 02:12:53 +0000 (21:12 -0500)
As a preparation for switching all current object map manipulations
within librados callbacks to AIO, the existing object map code
was moved to its own class for future maintainability.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioRequest.cc
src/librbd/CopyupRequest.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/LibrbdWriteback.cc
src/librbd/Makefile.am
src/librbd/ObjectMap.cc [new file with mode: 0644]
src/librbd/ObjectMap.h [new file with mode: 0644]
src/librbd/internal.cc

index ea898e8b6f08e3eba0bba14b86f86ced87c5c78d..2efde981d6274c7ce4dcc3a0ab29958009f102ed 100644 (file)
@@ -12,6 +12,7 @@
 
 #include "librbd/AioRequest.h"
 #include "librbd/CopyupRequest.h"
+#include "librbd/ObjectMap.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -196,9 +197,13 @@ namespace librbd {
     ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
 
     // send read request to parent if the object doesn't exist locally
-    if (!m_ictx->object_may_exist(m_object_no)) {
-      complete(-ENOENT);
-      return 0;
+    {
+      RWLock::RLocker l(m_ictx->md_lock);
+      if (m_ictx->object_map != NULL &&
+         !m_ictx->object_map->object_may_exist(m_object_no)) {
+       complete(-ENOENT);
+       return 0;
+      }
     }
 
     librados::AioCompletion *rados_completion =
index 1d158eb5910206e6e2f400a226a4dc038b966d18..8328fa556ec6e4c4aa4a0a35bac523bed9afbeb6 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "librbd/AioRequest.h"
 #include "librbd/CopyupRequest.h"
+#include "librbd/ObjectMap.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -72,12 +73,18 @@ namespace librbd {
     std::vector<librados::snap_t> snaps;
     snaps.insert(snaps.end(), snapc.snaps.begin(), snapc.snaps.end());
 
-    r = m_ictx->update_object_map(m_object_no, OBJECT_EXISTS);
-    if (r < 0) {
-      lderr(m_ictx->cct) << __func__ << " " << this
-                        << ": failed to update object map:"
-                        << cpp_strerror(r) << dendl;
-      return;
+    {
+      RWLock::RLocker l(m_ictx->md_lock);
+      if (m_ictx->object_map != NULL) {
+       r = m_ictx->object_map->update(m_object_no, OBJECT_EXISTS,
+                                      boost::optional<uint8_t>());
+       if (r < 0) {
+         lderr(m_ictx->cct) << __func__ << " " << this
+                            << ": failed to update object map:"
+                            << cpp_strerror(r) << dendl;
+         return;
+       }
+      }
     }
 
     librados::ObjectWriteOperation copyup_op;
index 0eb2b2cd4ce2e67f3832687af96561a5898deb61..9fa8689b4ad01f950673ba0f4f9ad9799e5a9a45 100644 (file)
@@ -7,12 +7,11 @@
 #include "common/errno.h"
 #include "common/perf_counters.h"
 
-#include "cls/lock/cls_lock_client.h"
-
 #include "librbd/internal.h"
 
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageWatcher.h"
+#include "librbd/ObjectMap.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -61,7 +60,7 @@ namespace librbd {
       object_cacher(NULL), writeback_handler(NULL), object_set(NULL),
       readahead(),
       total_bytes_read(0), copyup_finisher(NULL),
-      pending_aio(0)
+      pending_aio(0), object_map(NULL)
   {
     md_ctx.dup(p);
     data_ctx.dup(p);
@@ -131,6 +130,7 @@ namespace librbd {
       delete copyup_finisher;
       copyup_finisher = NULL;
     }
+    delete object_map;
     delete[] format_string;
   }
 
@@ -581,9 +581,7 @@ namespace librbd {
   }
 
   void ImageCtx::shutdown_cache() {
-    md_lock.get_write();
     invalidate_cache();
-    md_lock.put_write();
     object_cacher->stop();
   }
 
@@ -686,226 +684,4 @@ namespace librbd {
       copyup_list_cond.Wait(copyup_list_lock);
     }
   }
-
-  int ImageCtx::lock_object_map()
-  {
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
-      return 0;
-    }
-
-    int r;
-    bool broke_lock = false;
-    while (true) {
-      ldout(cct, 10) << "locking object map" << dendl;
-      r = rados::cls::lock::lock(&md_ctx, object_map_name(id), RBD_LOCK_NAME,
-                                 LOCK_EXCLUSIVE, "", "", "", utime_t(), 0);
-      if (r == 0) {
-        break;
-      } else if (broke_lock || r != -EBUSY) {
-        lderr(cct) << "failed to lock object map: " << cpp_strerror(r) << dendl;
-        return r;
-      }
-
-      typedef std::map<rados::cls::lock::locker_id_t,
-                       rados::cls::lock::locker_info_t> lockers_t;
-      lockers_t lockers;
-      ClsLockType lock_type;
-      std::string lock_tag;
-      int r = rados::cls::lock::get_lock_info(&md_ctx, object_map_name(id),
-                                              RBD_LOCK_NAME, &lockers,
-                                              &lock_type, &lock_tag);
-      if (r == -ENOENT) {
-        continue;
-      } else if (r < 0) {
-        lderr(cct) << "failed to list object map locks: " << cpp_strerror(r)
-                   << dendl;
-        return r;
-      }
-
-      ldout(cct, 10) << "breaking current object map lock" << dendl;
-      for (lockers_t::iterator it = lockers.begin();
-           it != lockers.end(); ++it) {
-        const rados::cls::lock::locker_id_t &locker = it->first;
-        r = rados::cls::lock::break_lock(&md_ctx, object_map_name(id),
-                                         RBD_LOCK_NAME, locker.cookie,
-                                         locker.locker);
-        if (r < 0 && r != -ENOENT) {
-          lderr(cct) << "failed to break object map lock: " << cpp_strerror(r)
-                     << dendl;
-          return r;
-        }
-      }
-
-
-
-      broke_lock = true;
-    }
-    return 0;
-  }
-
-  int ImageCtx::unlock_object_map()
-  {
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
-      return 0;
-    }
-
-    int r = rados::cls::lock::unlock(&md_ctx, object_map_name(id),
-                                     RBD_LOCK_NAME, "");
-    if (r < 0 && r != -ENOENT) {
-      lderr(cct) << "failed to release object map lock: " << cpp_strerror(r)
-                 << dendl;
-    }
-    return r;
-  }
-
-  bool ImageCtx::object_may_exist(uint64_t object_no) const
-  {
-    // Fall back to default logic if object map is disabled or invalid
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0 ||
-        ((flags & RBD_FLAG_OBJECT_MAP_INVALID) != 0)) {
-      return true;
-    }
-
-    RWLock::RLocker l(object_map_lock);
-    assert(object_no < object_map.size());
-    return (object_map[object_no] == OBJECT_EXISTS ||
-           object_map[object_no] == OBJECT_PENDING);
-  }
-
-  int ImageCtx::refresh_object_map()
-  {
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
-      return 0;
-    }
-
-    RWLock::WLocker l(object_map_lock);
-    int r = cls_client::object_map_load(&data_ctx, object_map_name(id),
-                                       &object_map);
-    if (r < 0) {
-      lderr(cct) << "error refreshing object map: " << cpp_strerror(r)
-                << dendl;
-      invalidate_object_map();
-      object_map.clear();
-      return r;
-    }
-
-    ldout(cct, 20) << "refreshed object map: " << object_map.size()
-                   << dendl;
-
-    uint64_t num_objs = Striper::get_num_objects(layout, get_current_size());
-    if (object_map.size() != num_objs) {
-      // resize op might have been interrupted
-      lderr(cct) << "incorrect object map size: " << object_map.size()
-                << " != " << num_objs << dendl;
-      invalidate_object_map();
-      return -EINVAL;
-    }
-    return 0;
-  }
-
-  int ImageCtx::resize_object_map(uint8_t default_object_state)
-  {
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
-      return 0;
-    }
-
-    RWLock::WLocker l(object_map_lock);
-    uint64_t num_objs = Striper::get_num_objects(layout, get_current_size());
-    ldout(cct, 20) << "resizing object map: " << num_objs << dendl;
-    librados::ObjectWriteOperation op;
-    rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
-    cls_client::object_map_resize(&op, num_objs, default_object_state);
-    int r = data_ctx.operate(object_map_name(id), &op);
-    if (r == -EBUSY) {
-      lderr(cct) << "object map lock not owned by client" << dendl;
-      return r;
-    } else if (r < 0) {
-      lderr(cct) << "error resizing object map: size=" << num_objs << ", "
-                 << "state=" << default_object_state << ", "
-                 << "error=" << cpp_strerror(r) << dendl;
-      invalidate_object_map();
-      return 0;
-    }
-
-    size_t orig_object_map_size = object_map.size();
-    object_map.resize(num_objs);
-    for (uint64_t i = orig_object_map_size; i < object_map.size(); ++i) {
-      object_map[i] = default_object_state;
-    }
-    return 0;
-  }
-
-  int ImageCtx::update_object_map(uint64_t object_no, uint8_t object_state)
-  {
-    return update_object_map(object_no, object_no + 1, object_state,
-                            boost::optional<uint8_t>());
-  }
-
-  int ImageCtx::update_object_map(uint64_t start_object_no,
-                                  uint64_t end_object_no, uint8_t new_state,
-                                 const boost::optional<uint8_t> &current_state)
-  {
-    if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
-      return 0;
-    }
-
-    RWLock::WLocker l(object_map_lock);
-    assert(start_object_no <= end_object_no);
-    assert(end_object_no <= object_map.size() ||
-          (flags & RBD_FLAG_OBJECT_MAP_INVALID) != 0);
-    if (end_object_no > object_map.size()) {
-      ldout(cct, 20) << "skipping update of invalid object map" << dendl;
-      return 0;
-    }
-
-    bool update_required = false;
-    for (uint64_t object_no = start_object_no; object_no < end_object_no;
-        ++object_no) {
-      if ((!current_state || object_map[object_no] == *current_state) &&
-         object_map[object_no] != new_state) {
-       update_required = true;
-       break;
-      }
-    }
-
-    if (!update_required) {
-      return 0;
-    }
-
-    ldout(cct, 20) << "updating object map: [" << start_object_no << ","
-                  << end_object_no << ") = "
-                  << static_cast<uint32_t>(new_state) << dendl;
-
-    librados::ObjectWriteOperation op;
-    rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
-    cls_client::object_map_update(&op, start_object_no, end_object_no,
-                                  new_state, current_state);
-    int r = data_ctx.operate(object_map_name(id), &op);
-    if (r == -EBUSY) {
-      lderr(cct) << "object map lock not owned by client" << dendl;
-      return r;
-    } else if (r < 0) {
-      lderr(cct) << "object map update failed: " << cpp_strerror(r) << dendl;
-      invalidate_object_map();
-    } else {
-      for (uint64_t object_no = start_object_no; object_no < end_object_no;
-           ++object_no) {
-       if (!current_state || object_map[object_no] == *current_state) {
-         object_map[object_no] = new_state;
-        }
-      }
-    }
-    return 0;
-  }
-
-  void ImageCtx::invalidate_object_map()
-  {
-    flags |= RBD_FLAG_OBJECT_MAP_INVALID;
-    int r = cls_client::set_flags(&md_ctx, header_oid, flags,
-                                  RBD_FLAG_OBJECT_MAP_INVALID);
-    if (r < 0) {
-      lderr(cct) << "Failed to invalidate object map: " << cpp_strerror(r)
-                 << dendl;
-    }
-  }
 }
index 5ece66d828386cbfea9ae7347b6cf6191b49053b..c7aa8cc3e36cf224d7f19631fef33be18f39098e 100644 (file)
@@ -15,7 +15,6 @@
 #include "common/Mutex.h"
 #include "common/Readahead.h"
 #include "common/RWLock.h"
-#include "common/bit_vector.hpp"
 #include "common/snap_types.h"
 #include "include/buffer.h"
 #include "include/rbd/librbd.hpp"
@@ -36,6 +35,7 @@ namespace librbd {
 
   class ImageWatcher;
   class CopyupRequest;
+  class ObjectMap;
 
   struct ImageCtx {
     CephContext *cct;
@@ -113,7 +113,7 @@ namespace librbd {
     Cond pending_aio_cond;
     uint64_t pending_aio;
 
-    ceph::BitVector<2> object_map;
+    ObjectMap *object_map;
 
     /**
      * Either image_name or image_id must be set.
@@ -181,18 +181,6 @@ namespace librbd {
                                  uint64_t overlap);
     void wait_for_pending_aio();
     void wait_for_pending_copyup();
-
-    /* object map */
-    int lock_object_map();
-    int unlock_object_map();
-    bool object_may_exist(uint64_t object_no) const;
-    int refresh_object_map();
-    int resize_object_map(uint8_t default_object_state);
-    int update_object_map(uint64_t object_no, uint8_t object_state);
-    int update_object_map(uint64_t start_object_no, uint64_t end_object_no,
-                         uint8_t new_state,
-                         const boost::optional<uint8_t> &current_state);
-    void invalidate_object_map();
   };
 }
 
index 6d248d899ed70767c8dd4b1018a01a5c47a43beb..876776ceee783ed7a77ffbda5c39a409a91d9aeb 100644 (file)
@@ -3,6 +3,7 @@
 #include "librbd/ImageWatcher.h"
 #include "librbd/AioCompletion.h"
 #include "librbd/ImageCtx.h"
+#include "librbd/ObjectMap.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/lock/cls_lock_types.h"
 #include "include/encoding.h"
@@ -320,12 +321,14 @@ int ImageWatcher::lock() {
   ldout(m_image_ctx.cct, 20) << "acquired exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
 
-  r = m_image_ctx.lock_object_map();
-  if (r < 0 && r != -ENOENT) {
-    unlock();
-    return r;
+  if (m_image_ctx.object_map != NULL) {
+    r = m_image_ctx.object_map->lock();
+    if (r < 0 && r != -ENOENT) {
+      unlock();
+      return r;
+    }
+    m_image_ctx.object_map->refresh();
   }
-  m_image_ctx.refresh_object_map();
 
   bufferlist bl;
   ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
@@ -357,7 +360,9 @@ int ImageWatcher::unlock()
     return r;
   }
 
-  m_image_ctx.unlock_object_map();
+  if (m_image_ctx.object_map != NULL) {
+    m_image_ctx.object_map->unlock();
+  }
   notify_released_lock();
   return 0;
 }
index 78e2c2d2dbf39f8fcf236054e809efb50564a956..733e9ec5a95bc0f0656fe41eb7c96fb14dbdc810 100644 (file)
@@ -15,6 +15,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
 #include "librbd/LibrbdWriteback.h"
+#include "librbd/ObjectMap.h"
 
 #include "include/assert.h"
 
@@ -104,9 +105,14 @@ namespace librbd {
   {
     // on completion, take the mutex and then call onfinish.
     Context *req = new C_Request(m_ictx->cct, onfinish, &m_lock);
-    if (!m_ictx->object_may_exist(object_no)) {
-      m_finisher->queue(req, -ENOENT);
-      return;
+
+    {
+      RWLock::RLocker l(m_ictx->md_lock);
+      if (m_ictx->object_map != NULL &&
+         !m_ictx->object_map->object_may_exist(object_no)) {
+       m_finisher->queue(req, -ENOENT);
+       return;
+      }
     }
 
     librados::AioCompletion *rados_completion =
index 3a49a12df1b36b8c5aef503b44f828e75149d955..2880bedb69123338a74d74bd69beeb48e19d0514 100644 (file)
@@ -6,7 +6,8 @@ librbd_internal_la_SOURCES = \
        librbd/ImageCtx.cc \
        librbd/ImageWatcher.cc \
        librbd/internal.cc \
-       librbd/LibrbdWriteback.cc
+       librbd/LibrbdWriteback.cc \
+       librbd/ObjectMap.cc
 noinst_LTLIBRARIES += librbd_internal.la
 
 librbd_api_la_SOURCES = \
@@ -43,5 +44,6 @@ noinst_HEADERS += \
        librbd/ImageWatcher.h \
        librbd/internal.h \
        librbd/LibrbdWriteback.h \
+       librbd/ObjectMap.h \
        librbd/parent_types.h \
        librbd/SnapInfo.h
diff --git a/src/librbd/ObjectMap.cc b/src/librbd/ObjectMap.cc
new file mode 100644 (file)
index 0000000..ada967b
--- /dev/null
@@ -0,0 +1,257 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "librbd/ObjectMap.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "cls/lock/cls_lock_client.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::ObjectMap: "
+
+namespace librbd {
+
+ObjectMap::ObjectMap(ImageCtx &image_ctx)
+  : m_image_ctx(image_ctx)
+{
+}
+
+int ObjectMap::lock()
+{
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    return 0;
+  }
+
+  int r;
+  bool broke_lock = false;
+  CephContext *cct = m_image_ctx.cct;
+  while (true) {
+    ldout(cct, 10) << "locking object map" << dendl;
+    r = rados::cls::lock::lock(&m_image_ctx.md_ctx,
+                              object_map_name(m_image_ctx.id),
+                              RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "", "",
+                              utime_t(), 0);
+    if (r == 0) {
+      break;
+    } else if (broke_lock || r != -EBUSY) {
+      lderr(cct) << "failed to lock object map: " << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    typedef std::map<rados::cls::lock::locker_id_t,
+                     rados::cls::lock::locker_info_t> lockers_t;
+    lockers_t lockers;
+    ClsLockType lock_type;
+    std::string lock_tag;
+    int r = rados::cls::lock::get_lock_info(&m_image_ctx.md_ctx,
+                                           object_map_name(m_image_ctx.id),
+                                            RBD_LOCK_NAME, &lockers,
+                                            &lock_type, &lock_tag);
+    if (r == -ENOENT) {
+      continue;
+    } else if (r < 0) {
+      lderr(cct) << "failed to list object map locks: " << cpp_strerror(r)
+                 << dendl;
+      return r;
+    }
+
+    ldout(cct, 10) << "breaking current object map lock" << dendl;
+    for (lockers_t::iterator it = lockers.begin();
+         it != lockers.end(); ++it) {
+      const rados::cls::lock::locker_id_t &locker = it->first;
+      r = rados::cls::lock::break_lock(&m_image_ctx.md_ctx,
+                                      object_map_name(m_image_ctx.id),
+                                       RBD_LOCK_NAME, locker.cookie,
+                                       locker.locker);
+      if (r < 0 && r != -ENOENT) {
+        lderr(cct) << "failed to break object map lock: " << cpp_strerror(r)
+                   << dendl;
+        return r;
+      }
+    }
+
+
+
+    broke_lock = true;
+  }
+  return 0;
+}
+
+int ObjectMap::unlock()
+{
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    return 0;
+  }
+
+  int r = rados::cls::lock::unlock(&m_image_ctx.md_ctx,
+                                  object_map_name(m_image_ctx.id),
+                                   RBD_LOCK_NAME, "");
+  if (r < 0 && r != -ENOENT) {
+    lderr(m_image_ctx.cct) << "failed to release object map lock: "
+                          << cpp_strerror(r) << dendl;
+  }
+  return r;
+}
+
+bool ObjectMap::object_may_exist(uint64_t object_no) const
+{
+  // Fall back to default logic if object map is disabled or invalid
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0 ||
+      ((m_image_ctx.flags & RBD_FLAG_OBJECT_MAP_INVALID) != 0)) {
+    return true;
+  }
+
+  RWLock::RLocker l(m_image_ctx.object_map_lock);
+  assert(object_no < object_map.size());
+  return (object_map[object_no] == OBJECT_EXISTS ||
+          object_map[object_no] == OBJECT_PENDING);
+}
+
+int ObjectMap::refresh()
+{ 
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    return 0;
+  }
+  
+  RWLock::WLocker l(m_image_ctx.object_map_lock);
+  CephContext *cct = m_image_ctx.cct;
+  int r = cls_client::object_map_load(&m_image_ctx.data_ctx,
+                                     object_map_name(m_image_ctx.id),
+                                      &object_map);
+  if (r < 0) { 
+    lderr(cct) << "error refreshing object map: " << cpp_strerror(r)
+               << dendl;
+    invalidate();
+    object_map.clear();
+    return r;
+  }
+  
+  ldout(cct, 20) << "refreshed object map: " << object_map.size()
+                 << dendl;
+  
+  uint64_t num_objs = Striper::get_num_objects(m_image_ctx.layout,
+                                              m_image_ctx.get_current_size());
+  if (object_map.size() != num_objs) {
+    // resize op might have been interrupted
+    lderr(cct) << "incorrect object map size: " << object_map.size()
+               << " != " << num_objs << dendl;
+    invalidate();
+    return -EINVAL;
+  }
+  return 0;
+}
+int ObjectMap::resize(uint8_t default_object_state)
+{
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    return 0;
+  }
+
+  RWLock::WLocker l(m_image_ctx.object_map_lock);
+  CephContext *cct = m_image_ctx.cct;
+  uint64_t num_objs = Striper::get_num_objects(m_image_ctx.layout,
+                                               m_image_ctx.get_current_size());
+  ldout(cct, 20) << "resizing object map: " << num_objs << dendl;
+  librados::ObjectWriteOperation op;
+  rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
+  cls_client::object_map_resize(&op, num_objs, default_object_state);
+  int r = m_image_ctx.data_ctx.operate(object_map_name(m_image_ctx.id), &op);
+  if (r == -EBUSY) {
+    lderr(cct) << "object map lock not owned by client" << dendl;
+    return r;
+  } else if (r < 0) {
+    lderr(cct) << "error resizing object map: size=" << num_objs << ", "
+               << "state=" << default_object_state << ", "
+               << "error=" << cpp_strerror(r) << dendl;
+    invalidate();
+    return 0;
+  }
+
+  size_t orig_object_map_size = object_map.size();
+  object_map.resize(num_objs);
+  for (uint64_t i = orig_object_map_size; i < object_map.size(); ++i) {
+    object_map[i] = default_object_state;
+  }
+  return 0;
+}
+
+int ObjectMap::update(uint64_t object_no, uint8_t new_state,
+                     const boost::optional<uint8_t> &current_state)
+{
+  return update(object_no, object_no + 1, new_state, current_state);
+}
+
+int ObjectMap::update(uint64_t start_object_no,
+                      uint64_t end_object_no, uint8_t new_state,
+                      const boost::optional<uint8_t> &current_state)
+{
+  if ((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    return 0;
+  }
+
+  RWLock::WLocker l(m_image_ctx.object_map_lock);
+  CephContext *cct = m_image_ctx.cct;
+  assert(start_object_no <= end_object_no);
+  assert(end_object_no <= object_map.size() ||
+         (m_image_ctx.flags & RBD_FLAG_OBJECT_MAP_INVALID) != 0);
+  if (end_object_no > object_map.size()) {
+    ldout(cct, 20) << "skipping update of invalid object map" << dendl;
+    return 0;
+  }
+
+  bool update_required = false;
+  for (uint64_t object_no = start_object_no; object_no < end_object_no;
+       ++object_no) {
+    if ((!current_state || object_map[object_no] == *current_state) &&
+        object_map[object_no] != new_state) {
+      update_required = true;
+      break;
+    }
+  }
+
+  if (!update_required) {
+    return 0;
+  }
+
+  ldout(cct, 20) << "updating object map: [" << start_object_no << ","
+                 << end_object_no << ") = "
+                 << static_cast<uint32_t>(new_state) << dendl;
+
+  librados::ObjectWriteOperation op;
+  rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
+  cls_client::object_map_update(&op, start_object_no, end_object_no,
+                                new_state, current_state);
+  int r = m_image_ctx.data_ctx.operate(object_map_name(m_image_ctx.id), &op);
+  if (r == -EBUSY) {
+    lderr(cct) << "object map lock not owned by client" << dendl;
+    return r;
+  } else if (r < 0) {
+    lderr(cct) << "object map update failed: " << cpp_strerror(r) << dendl;
+    invalidate();
+  } else {
+    for (uint64_t object_no = start_object_no; object_no < end_object_no;
+         ++object_no) {
+      if (!current_state || object_map[object_no] == *current_state) {
+        object_map[object_no] = new_state;
+      }
+    }
+  }
+  return 0;
+}
+
+void ObjectMap::invalidate()
+{
+  // TODO: md_lock
+  m_image_ctx.flags |= RBD_FLAG_OBJECT_MAP_INVALID;
+  int r = cls_client::set_flags(&m_image_ctx.md_ctx,
+                               m_image_ctx.header_oid,
+                               m_image_ctx.flags,
+                                RBD_FLAG_OBJECT_MAP_INVALID);
+  if (r < 0) {
+    lderr(m_image_ctx.cct) << "Failed to invalidate object map: "
+                          << cpp_strerror(r) << dendl;
+  }
+}
+
+} // namespace librbd
diff --git a/src/librbd/ObjectMap.h b/src/librbd/ObjectMap.h
new file mode 100644 (file)
index 0000000..da426d1
--- /dev/null
@@ -0,0 +1,44 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_LIBRBD_OBJECT_MAP_H
+#define CEPH_LIBRBD_OBJECT_MAP_H
+
+#include "include/int_types.h"
+#include "common/bit_vector.hpp"
+#include <boost/optional.hpp>
+
+namespace librbd {
+
+class ImageCtx;
+
+class ObjectMap {
+public:
+
+  ObjectMap(ImageCtx &image_ctx);
+
+  int lock();
+  int unlock();
+
+  bool object_may_exist(uint64_t object_no) const;
+
+  int refresh();
+  int resize(uint8_t default_object_state);
+
+  int update(uint64_t object_no, uint8_t new_state,
+            const boost::optional<uint8_t> &current_state);
+  int update(uint64_t start_object_no, uint64_t end_object_no,
+            uint8_t new_state, const boost::optional<uint8_t> &current_state);
+
+private:
+
+  ImageCtx &m_image_ctx;
+
+  ceph::BitVector<2> object_map;
+
+  void invalidate();
+
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_OBJECT_MAP_H
index db13d68e0fc426df3fa2b9a0ab2de4c5dbd398c1..a200d183e64dc0b602d2175fdeb3883e0bb65d3e 100644 (file)
@@ -23,6 +23,7 @@
 #include "librbd/ImageWatcher.h"
 
 #include "librbd/internal.h"
+#include "librbd/ObjectMap.h"
 #include "librbd/parent_types.h"
 #include "include/util.h"
 
@@ -1775,8 +1776,10 @@ reprotect_and_return_err:
        return;
       }
 
-      m_ictx->size = m_new_size;
-      m_ictx->resize_object_map(OBJECT_NONEXISTENT);
+      if (m_ictx->object_map != NULL) {
+       m_ictx->size = m_new_size;
+       m_ictx->object_map->resize(OBJECT_NONEXISTENT);
+      }
     }
 
   private:
@@ -1832,16 +1835,20 @@ reprotect_and_return_err:
     }
 
     virtual int send() {
+      {
+       RWLock::RLocker l(m_ictx->md_lock);
+       if (m_ictx->object_map != NULL &&
+           !m_ictx->object_map->object_may_exist(m_object_no)) {
+         return 1;
+       }
+      }
+
       RWLock::RLocker l(m_ictx->owner_lock);
       if (m_ictx->image_watcher->is_lock_supported() &&
           !m_ictx->image_watcher->is_lock_owner()) {
         return -ERESTART;
       }
 
-      if (!m_ictx->object_may_exist(m_object_no)) {
-       return 1;
-      }
-
       string oid = m_ictx->get_object_name(m_object_no);
       librados::AioCompletion *rados_completion =
        librados::Rados::aio_create_completion(this, NULL, rados_ctx_cb);
@@ -1880,8 +1887,12 @@ reprotect_and_return_err:
        return;
       }
 
-      m_ictx->update_object_map(m_delete_start, m_num_objects,
-                               OBJECT_NONEXISTENT, OBJECT_PENDING);
+      RWLock::RLocker l2(m_ictx->md_lock);
+      if (m_ictx->object_map != NULL) {
+       m_ictx->object_map->update(m_delete_start, m_num_objects,
+                                  OBJECT_NONEXISTENT, OBJECT_PENDING);
+      }
+
       if (m_delete_offset <= m_new_size) {
        m_ctx->complete(r);
        return;
@@ -1904,20 +1915,25 @@ reprotect_and_return_err:
        bool flag_nonexistent = false;
        if (p->offset == 0) {
          flag_nonexistent = true;
-         m_ictx->update_object_map(p->objectno, p->objectno + 1,
-                                   OBJECT_PENDING, OBJECT_EXISTS);
+         if (m_ictx->object_map != NULL) {
+           m_ictx->object_map->update(p->objectno, OBJECT_PENDING,
+                                      OBJECT_EXISTS);
+         }
          m_ictx->data_ctx.aio_remove(p->oid.name, rados_completion);
        } else {
-         m_ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+         if (m_ictx->object_map != NULL) {
+           m_ictx->object_map->update(p->objectno, OBJECT_EXISTS,
+                                      boost::optional<uint8_t>());
+         }
          librados::ObjectWriteOperation op;
          op.truncate(p->offset);
          m_ictx->data_ctx.aio_operate(p->oid.name, rados_completion, &op);
        }
        rados_completion->release();
 
-       if (flag_nonexistent) {
-         m_ictx->update_object_map(p->objectno, p->objectno + 1,
-                                   OBJECT_NONEXISTENT, OBJECT_PENDING);
+       if (flag_nonexistent && m_ictx->object_map != NULL) {
+         m_ictx->object_map->update(p->objectno, OBJECT_NONEXISTENT,
+                                    OBJECT_PENDING);
        }
       }
       completion->finish_adding_requests();
@@ -1958,8 +1974,13 @@ reprotect_and_return_err:
       ldout(cct, 2) << "trim_image objects " << delete_start << " to "
                    << (num_objects - 1) << dendl;
 
-      ictx->update_object_map(delete_start, num_objects, OBJECT_PENDING,
-                             OBJECT_EXISTS);
+      {
+       RWLock::RLocker l(ictx->md_lock);
+       if (ictx->object_map != NULL) {
+         ictx->object_map->update(delete_start, num_objects, OBJECT_PENDING,
+                                  OBJECT_EXISTS);
+       }
+      }
 
       AsyncObjectThrottle::ContextFactory context_factory(
         boost::lambda::bind(boost::lambda::new_ptr<AsyncTrimObjectContext>(),
@@ -2302,10 +2323,13 @@ reprotect_and_return_err:
        ictx->snap_exists = false;
       }
 
-      if (ictx->snap_exists) {
-       r = ictx->refresh_object_map();
-       if (r < 0) {
-         return r;
+      if ((ictx->features & RBD_FEATURE_OBJECT_MAP) == 0) {
+       delete ictx->object_map;
+       ictx->object_map = NULL;
+      } else {
+       ictx->object_map = new ObjectMap(*ictx);
+       if (ictx->snap_exists) {
+         ictx->object_map->refresh();
        }
       }
 
@@ -2591,7 +2615,9 @@ reprotect_and_return_err:
       return r;
     }
 
-    ictx->refresh_object_map();
+    if (ictx->object_map != NULL) {
+      ictx->object_map->refresh();
+    }
     refresh_parent(ictx);
     return 0;
   }
@@ -3698,9 +3724,15 @@ reprotect_and_return_err:
        bl.append(buf + q->first, q->second);
       }
 
-      r = ictx->update_object_map(p->objectno, OBJECT_EXISTS);
-      if (r < 0) {
-       goto done;
+      {
+       RWLock::RLocker l(ictx->md_lock);
+       if (ictx->object_map != NULL) {
+         r = ictx->object_map->update(p->objectno, OBJECT_EXISTS,
+                                      boost::optional<uint8_t>());
+         if (r < 0) {
+           goto done;
+         }
+       }
       }
 
       C_AioWrite *req_comp = new C_AioWrite(cct, c);
@@ -3802,13 +3834,13 @@ reprotect_and_return_err:
        object_overlap = ictx->prune_parent_extents(objectx, overlap);
       }
 
+      RWLock::RLocker l(ictx->md_lock);
       bool flag_nonexistent = false;
       if (p->offset == 0 && p->length == ictx->layout.fl_object_size) {
        req = new AioRemove(ictx, p->oid.name, p->objectno, objectx, object_overlap,
                            snapc, snap_id, req_comp);
-       if (!req->has_parent()) {
-          ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_PENDING,
-                                 OBJECT_EXISTS);
+       if (!req->has_parent() && ictx->object_map != NULL) {
+          ictx->object_map->update(p->objectno, OBJECT_PENDING, OBJECT_EXISTS);
          flag_nonexistent = true;
        }
       } else if (p->offset + p->length == ictx->layout.fl_object_size) {
@@ -3820,17 +3852,18 @@ reprotect_and_return_err:
                          snapc, snap_id, req_comp);
       }
 
-      if (!flag_nonexistent) {
-       ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+      if (!flag_nonexistent && ictx->object_map != NULL) {
+       ictx->object_map->update(p->objectno, OBJECT_EXISTS,
+                                boost::optional<uint8_t>());
       }
 
       r = req->send();
       if (r < 0)
        goto done;
 
-      if (flag_nonexistent) {
-       ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_NONEXISTENT,
-                               OBJECT_PENDING);
+      if (flag_nonexistent && ictx->object_map != NULL) {
+       ictx->object_map->update(p->objectno, OBJECT_NONEXISTENT,
+                                OBJECT_PENDING);
       }
     }
     r = 0;