]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-volume-client: allow atomic updates for RADOS objects
authorRishabh Dave <rishabhddave@gmail.com>
Thu, 7 Jun 2018 12:26:44 +0000 (12:26 +0000)
committerRishabh Dave <ridave@redhat.com>
Thu, 28 Jun 2018 15:24:57 +0000 (15:24 +0000)
put_object_versioned() takes the version of the object and verifies if
the version of the object is the expected one before updating the data
in the object. This verification of version before actually writing
makes put_objcet_version() atomic.

Rest of the changes include adding get_object_and_version() so that
current version of the object can be obtained and modification of
get_object() and put_object() to use get_object_and_version() and
put_object_versioned() respectively.

Fixes: http://tracker.ceph.com/issues/24173
Signed-off-by: Rishabh Dave <ridave@redhat.com>
src/pybind/ceph_volume_client.py
src/pybind/rados/rados.pyx

index 627bf50f00ae0f061eac51b3b285aa3909657091..2d0efe5e64b716e2ccc3a365dd9b5318984be73c 100644 (file)
@@ -205,6 +205,7 @@ CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
     * 1 - Initial version
     * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
     * 3 - Allow volumes to be created without RADOS namespace isolation
+    * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
 """
 
 
@@ -228,7 +229,7 @@ class CephFSVolumeClient(object):
     """
 
     # Current version
-    version = 3
+    version = 4
 
     # Where shall we create our volumes?
     POOL_PREFIX = "fsvolume_"
@@ -1407,15 +1408,40 @@ class CephFSVolumeClient(object):
         :param data: data to write
         :type data: bytes
         """
+        return self.put_object_versioned(pool_name, object_name, data)
+
+    def put_object_versioned(self, pool_name, object_name, data, version=None):
+        """
+        Synchronously write data to an object only if version of the object
+        version matches the expected version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+        :param data: data to write
+        :type data: bytes
+        :param version: expected version of the object to write
+        :type version: int
+        """
         ioctx = self.rados.open_ioctx(pool_name)
+
         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
         if len(data) > max_size:
             msg = ("Data to be written to object '{0}' exceeds "
                    "{1} bytes".format(object_name, max_size))
             log.error(msg)
             raise CephFSVolumeClientError(msg)
+
         try:
-            ioctx.write_full(object_name, data)
+            with rados.WriteOpCtx(ioctx) as wop:
+                if version is not None:
+                    wop.assert_version(version)
+                wop.write_full(data)
+                ioctx.operate_write_op(wop, object_name)
+        except rados.OSError as e:
+            log.error(e)
+            raise e
         finally:
             ioctx.close()
 
@@ -1430,6 +1456,19 @@ class CephFSVolumeClient(object):
 
         :returns: bytes - data read from object
         """
+        return self.get_object_and_version(pool_name, object_name)[0]
+
+    def get_object_and_version(self, pool_name, object_name):
+        """
+        Synchronously read data from object and get its version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+
+        :returns: tuple of object data and version
+        """
         ioctx = self.rados.open_ioctx(pool_name)
         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
         try:
@@ -1438,9 +1477,10 @@ class CephFSVolumeClient(object):
                     (ioctx.read(object_name, 1, offset=max_size))):
                 log.warning("Size of object {0} exceeds '{1}' bytes "
                             "read".format(object_name, max_size))
+            obj_version = ioctx.get_last_version()
         finally:
             ioctx.close()
-        return bytes_read
+        return (bytes_read, obj_version)
 
     def delete_object(self, pool_name, object_name):
         ioctx = self.rados.open_ioctx(pool_name)
index a795fd7344e6c2baaecc91a2c372a415cd2ab2d7..20ee1aa343745a23214b17d9548d61fe73466e53 100644 (file)
@@ -282,6 +282,7 @@ cdef extern from "rados/librados.h" nogil:
     void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
     void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
     void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
+    void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
     void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
     void rados_write_op_remove(rados_write_op_t write_op)
     void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
@@ -1999,6 +2000,19 @@ cdef class WriteOp(object):
         with nogil:
             rados_write_op_write(self.write_op, _to_write, length, _offset)
 
+    @requires(('version', int))
+    def assert_version(self, version):
+        """
+        Check if object's version is the expected one.
+        :param version: expected version of the object
+        :param type: int
+        """
+        cdef:
+            uint64_t _version = version
+
+        with nogil:
+            rados_write_op_assert_version(self.write_op, _version)
+
     @requires(('offset', int), ('length', int))
     def zero(self, offset, length):
         """