From: Rishabh Dave Date: Thu, 7 Jun 2018 12:26:44 +0000 (+0000) Subject: ceph-volume-client: allow atomic updates for RADOS objects X-Git-Tag: v12.2.9~63^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c3f9c972297c4d73a901453e806c16044e570667;p=ceph.git ceph-volume-client: allow atomic updates for RADOS objects put_object_versioned() takes the version of the object and verifies if the version of the object is the expected one before updating the data in the object. This verification of version before actually writing makes put_objcet_version() atomic. Rest of the changes include adding get_object_and_version() so that current version of the object can be obtained and modification of get_object() and put_object() to use get_object_and_version() and put_object_versioned() respectively. Fixes: http://tracker.ceph.com/issues/24173 Signed-off-by: Rishabh Dave (cherry picked from commit ca7253cff6cdac590bb14d0d297c02452bf75bf6) --- diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py index d38f72be9a3..c43681bef21 100644 --- a/src/pybind/ceph_volume_client.py +++ b/src/pybind/ceph_volume_client.py @@ -205,6 +205,7 @@ CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ * 1 - Initial version * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient * 3 - Allow volumes to be created without RADOS namespace isolation + * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient """ @@ -228,7 +229,7 @@ class CephFSVolumeClient(object): """ # Current version - version = 3 + version = 4 # Where shall we create our volumes? POOL_PREFIX = "fsvolume_" @@ -1403,15 +1404,40 @@ class CephFSVolumeClient(object): :param data: data to write :type data: bytes """ + return self.put_object_versioned(pool_name, object_name, data) + + def put_object_versioned(self, pool_name, object_name, data, version=None): + """ + Synchronously write data to an object only if version of the object + version matches the expected version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + :param version: expected version of the object to write + :type version: int + """ ioctx = self.rados.open_ioctx(pool_name) + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 if len(data) > max_size: msg = ("Data to be written to object '{0}' exceeds " "{1} bytes".format(object_name, max_size)) log.error(msg) raise CephFSVolumeClientError(msg) + try: - ioctx.write_full(object_name, data) + with rados.WriteOpCtx(ioctx) as wop: + if version is not None: + wop.assert_version(version) + wop.write_full(data) + ioctx.operate_write_op(wop, object_name) + except rados.OSError as e: + log.error(e) + raise e finally: ioctx.close() @@ -1426,6 +1452,19 @@ class CephFSVolumeClient(object): :returns: bytes - data read from object """ + return self.get_object_and_version(pool_name, object_name)[0] + + def get_object_and_version(self, pool_name, object_name): + """ + Synchronously read data from object and get its version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: tuple of object data and version + """ ioctx = self.rados.open_ioctx(pool_name) max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 try: @@ -1434,9 +1473,10 @@ class CephFSVolumeClient(object): (ioctx.read(object_name, 1, offset=max_size))): log.warning("Size of object {0} exceeds '{1}' bytes " "read".format(object_name, max_size)) + obj_version = ioctx.get_last_version() finally: ioctx.close() - return bytes_read + return (bytes_read, obj_version) def delete_object(self, pool_name, object_name): ioctx = self.rados.open_ioctx(pool_name) diff --git a/src/pybind/rados/rados.pyx b/src/pybind/rados/rados.pyx index e9829937a11..c0df28645b8 100644 --- a/src/pybind/rados/rados.pyx +++ b/src/pybind/rados/rados.pyx @@ -284,6 +284,7 @@ cdef extern from "rados/librados.h" nogil: void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category) void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len) void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len) + void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver) void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset) void rados_write_op_remove(rados_write_op_t write_op) void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset) @@ -1941,6 +1942,19 @@ cdef class WriteOp(object): with nogil: rados_write_op_write(self.write_op, _to_write, length, _offset) + @requires(('version', int)) + def assert_version(self, version): + """ + Check if object's version is the expected one. + :param version: expected version of the object + :param type: int + """ + cdef: + uint64_t _version = version + + with nogil: + rados_write_op_assert_version(self.write_op, _version) + @requires(('offset', int), ('length', int)) def zero(self, offset, length): """