From 8b2bd387859bdc2287ba6dcaba27d8b234984a5a Mon Sep 17 00:00:00 2001 From: Ramana Raja Date: Wed, 13 Sep 2017 19:53:43 +0530 Subject: [PATCH] pybind/ceph_volume_client: add get, put, and delete object interfaces Wrap low-level rados APIs to allow ceph_volume_client to get, put, and delete objects. The interfaces would allow OpenStack Manila's cephfs driver to store config data in a shared storage to implement highly available Manila deployments. Restrict write(put) and read(get) object sizes to 'osd_max_size' config setting. Signed-off-by: Ramana Raja (cherry picked from commit d1bd171d6b6eb00c47168f38cec1a30f9c9f02bd) --- qa/tasks/cephfs/test_volume_client.py | 74 ++++++++++++++++++++++++++- src/pybind/ceph_volume_client.py | 58 ++++++++++++++++++++- 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/qa/tasks/cephfs/test_volume_client.py b/qa/tasks/cephfs/test_volume_client.py index 65dc9a9eb8561..d50fa55c0da88 100644 --- a/qa/tasks/cephfs/test_volume_client.py +++ b/qa/tasks/cephfs/test_volume_client.py @@ -764,7 +764,7 @@ vc.disconnect() # auth ID belongs to, the auth ID's authorized access levels # for different volumes, versioning details, etc. expected_auth_metadata = { - u"version": 1, + u"version": 2, u"compat_version": 1, u"dirty": False, u"tenant_id": u"tenant1", @@ -791,7 +791,7 @@ vc.disconnect() # Verify that the volume metadata file stores info about auth IDs # and their access levels to the volume, versioning details, etc. expected_vol_metadata = { - u"version": 1, + u"version": 2, u"compat_version": 1, u"auths": { u"guest": { @@ -905,3 +905,73 @@ vc.disconnect() volume_id=volume_id, auth_id=guestclient["auth_id"], ))) + + def test_put_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test data' + obj_name = 'test_vc_obj_1' + pool_name = self.fs.get_data_pool_names()[0] + + self._volume_client_python(vc_mount, dedent(""" + vc.put_object("{pool_name}", "{obj_name}", b"{obj_data}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + obj_data = obj_data + ))) + + read_data = self.fs.rados(['get', obj_name, '-'], pool=pool_name) + self.assertEqual(obj_data, read_data) + + def test_get_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test_data' + obj_name = 'test_vc_ob_2' + pool_name = self.fs.get_data_pool_names()[0] + + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.get_object("{pool_name}", "{obj_name}") + assert data_read == b"{obj_data}" + """.format( + pool_name = pool_name, + obj_name = obj_name, + obj_data = obj_data + ))) + + def test_delete_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test data' + obj_name = 'test_vc_obj_3' + pool_name = self.fs.get_data_pool_names()[0] + + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.delete_object("{pool_name}", "{obj_name}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + ))) + + with self.assertRaises(CommandFailedError): + self.fs.rados(['stat', obj_name], pool=pool_name) + + # Check idempotency -- no error raised trying to delete non-existent + # object + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.delete_object("{pool_name}", "{obj_name}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + ))) diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py index 4221893752643..e2d22b743905b 100644 --- a/src/pybind/ceph_volume_client.py +++ b/src/pybind/ceph_volume_client.py @@ -204,6 +204,7 @@ CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ CephFSVolumeClient Version History: * 1 - Initial version + * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient """ @@ -228,7 +229,7 @@ class CephFSVolumeClient(object): """ # Current version - version = 1 + version = 2 # Where shall we create our volumes? POOL_PREFIX = "fsvolume_" @@ -1339,3 +1340,58 @@ class CephFSVolumeClient(object): src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) self._cp_r(src_snapshot_path, dest_fs_path) + + def put_object(self, pool_name, object_name, data): + """ + Synchronously write data to an object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + """ + ioctx = self.rados.open_ioctx(pool_name) + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + if len(data) > max_size: + msg = ("Data to be written to object '{0}' exceeds " + "{1} bytes".format(object_name, max_size)) + log.error(msg) + raise CephFSVolumeClientError(msg) + try: + ioctx.write_full(object_name, data) + finally: + ioctx.close() + + def get_object(self, pool_name, object_name): + """ + Synchronously read data from object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: bytes - data read from object + """ + ioctx = self.rados.open_ioctx(pool_name) + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + try: + bytes_read = ioctx.read(object_name, max_size) + if ((len(bytes_read) == max_size) and + (ioctx.read(object_name, 1, offset=max_size))): + log.warning("Size of object {0} exceeds '{1}' bytes " + "read".format(object_name, max_size)) + finally: + ioctx.close() + return bytes_read + + def delete_object(self, pool_name, object_name): + ioctx = self.rados.open_ioctx(pool_name) + try: + ioctx.remove_object(object_name) + except rados.ObjectNotFound: + log.warn("Object '{0}' was already removed".format(object_name)) + finally: + ioctx.close() -- 2.39.5