* 1 - Initial version
* 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
* 3 - Allow volumes to be created without RADOS namespace isolation
+ * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
"""
"""
# Current version
- version = 3
+ version = 4
# Where shall we create our volumes?
POOL_PREFIX = "fsvolume_"
:param data: data to write
:type data: bytes
"""
+ return self.put_object_versioned(pool_name, object_name, data)
+
+ def put_object_versioned(self, pool_name, object_name, data, version=None):
+ """
+ Synchronously write data to an object only if version of the object
+ version matches the expected version.
+
+ :param pool_name: name of the pool
+ :type pool_name: str
+ :param object_name: name of the object
+ :type object_name: str
+ :param data: data to write
+ :type data: bytes
+ :param version: expected version of the object to write
+ :type version: int
+ """
ioctx = self.rados.open_ioctx(pool_name)
+
max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
if len(data) > max_size:
msg = ("Data to be written to object '{0}' exceeds "
"{1} bytes".format(object_name, max_size))
log.error(msg)
raise CephFSVolumeClientError(msg)
+
try:
- ioctx.write_full(object_name, data)
+ with rados.WriteOpCtx(ioctx) as wop:
+ if version is not None:
+ wop.assert_version(version)
+ wop.write_full(data)
+ ioctx.operate_write_op(wop, object_name)
+ except rados.OSError as e:
+ log.error(e)
+ raise e
finally:
ioctx.close()
:returns: bytes - data read from object
"""
+ return self.get_object_and_version(pool_name, object_name)[0]
+
+ def get_object_and_version(self, pool_name, object_name):
+ """
+ Synchronously read data from object and get its version.
+
+ :param pool_name: name of the pool
+ :type pool_name: str
+ :param object_name: name of the object
+ :type object_name: str
+
+ :returns: tuple of object data and version
+ """
ioctx = self.rados.open_ioctx(pool_name)
max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
try:
(ioctx.read(object_name, 1, offset=max_size))):
log.warning("Size of object {0} exceeds '{1}' bytes "
"read".format(object_name, max_size))
+ obj_version = ioctx.get_last_version()
finally:
ioctx.close()
- return bytes_read
+ return (bytes_read, obj_version)
def delete_object(self, pool_name, object_name):
ioctx = self.rados.open_ioctx(pool_name)
void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
+ void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
void rados_write_op_remove(rados_write_op_t write_op)
void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
with nogil:
rados_write_op_write(self.write_op, _to_write, length, _offset)
+ @requires(('version', int))
+ def assert_version(self, version):
+ """
+ Check if object's version is the expected one.
+ :param version: expected version of the object
+ :param type: int
+ """
+ cdef:
+ uint64_t _version = version
+
+ with nogil:
+ rados_write_op_assert_version(self.write_op, _version)
+
@requires(('offset', int), ('length', int))
def zero(self, offset, length):
"""