]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-volume-client: allow atomic updates for RADOS objects
authorRishabh Dave <rishabhddave@gmail.com>
Thu, 7 Jun 2018 12:26:44 +0000 (12:26 +0000)
committerPatrick Donnelly <pdonnell@redhat.com>
Thu, 13 Sep 2018 18:22:59 +0000 (11:22 -0700)
put_object_versioned() takes the version of the object and verifies if
the version of the object is the expected one before updating the data
in the object. This verification of version before actually writing
makes put_objcet_version() atomic.

Rest of the changes include adding get_object_and_version() so that
current version of the object can be obtained and modification of
get_object() and put_object() to use get_object_and_version() and
put_object_versioned() respectively.

Fixes: http://tracker.ceph.com/issues/24173
Signed-off-by: Rishabh Dave <ridave@redhat.com>
(cherry picked from commit ca7253cff6cdac590bb14d0d297c02452bf75bf6)

src/pybind/ceph_volume_client.py
src/pybind/rados/rados.pyx

index d38f72be9a37ab9e7773e1da3e09832ef05cc3b6..c43681bef21ea549b292dbdedb61ab807e75e1dd 100644 (file)
@@ -205,6 +205,7 @@ CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
     * 1 - Initial version
     * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
     * 3 - Allow volumes to be created without RADOS namespace isolation
+    * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
 """
 
 
@@ -228,7 +229,7 @@ class CephFSVolumeClient(object):
     """
 
     # Current version
-    version = 3
+    version = 4
 
     # Where shall we create our volumes?
     POOL_PREFIX = "fsvolume_"
@@ -1403,15 +1404,40 @@ class CephFSVolumeClient(object):
         :param data: data to write
         :type data: bytes
         """
+        return self.put_object_versioned(pool_name, object_name, data)
+
+    def put_object_versioned(self, pool_name, object_name, data, version=None):
+        """
+        Synchronously write data to an object only if version of the object
+        version matches the expected version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+        :param data: data to write
+        :type data: bytes
+        :param version: expected version of the object to write
+        :type version: int
+        """
         ioctx = self.rados.open_ioctx(pool_name)
+
         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
         if len(data) > max_size:
             msg = ("Data to be written to object '{0}' exceeds "
                    "{1} bytes".format(object_name, max_size))
             log.error(msg)
             raise CephFSVolumeClientError(msg)
+
         try:
-            ioctx.write_full(object_name, data)
+            with rados.WriteOpCtx(ioctx) as wop:
+                if version is not None:
+                    wop.assert_version(version)
+                wop.write_full(data)
+                ioctx.operate_write_op(wop, object_name)
+        except rados.OSError as e:
+            log.error(e)
+            raise e
         finally:
             ioctx.close()
 
@@ -1426,6 +1452,19 @@ class CephFSVolumeClient(object):
 
         :returns: bytes - data read from object
         """
+        return self.get_object_and_version(pool_name, object_name)[0]
+
+    def get_object_and_version(self, pool_name, object_name):
+        """
+        Synchronously read data from object and get its version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+
+        :returns: tuple of object data and version
+        """
         ioctx = self.rados.open_ioctx(pool_name)
         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
         try:
@@ -1434,9 +1473,10 @@ class CephFSVolumeClient(object):
                     (ioctx.read(object_name, 1, offset=max_size))):
                 log.warning("Size of object {0} exceeds '{1}' bytes "
                             "read".format(object_name, max_size))
+            obj_version = ioctx.get_last_version()
         finally:
             ioctx.close()
-        return bytes_read
+        return (bytes_read, obj_version)
 
     def delete_object(self, pool_name, object_name):
         ioctx = self.rados.open_ioctx(pool_name)
index e9829937a11655f18a03b257d142c6d997d04bf3..c0df28645b805df1cfcc2cd5a0adfba466c341f2 100644 (file)
@@ -284,6 +284,7 @@ cdef extern from "rados/librados.h" nogil:
     void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
     void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
     void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
+    void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
     void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
     void rados_write_op_remove(rados_write_op_t write_op)
     void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
@@ -1941,6 +1942,19 @@ cdef class WriteOp(object):
         with nogil:
             rados_write_op_write(self.write_op, _to_write, length, _offset)
 
+    @requires(('version', int))
+    def assert_version(self, version):
+        """
+        Check if object's version is the expected one.
+        :param version: expected version of the object
+        :param type: int
+        """
+        cdef:
+            uint64_t _version = version
+
+        with nogil:
+            rados_write_op_assert_version(self.write_op, _version)
+
     @requires(('offset', int), ('length', int))
     def zero(self, offset, length):
         """