stdin=json.dumps(exports),
stdout=StringIO(), stderr=StringIO())
+ def update_export(self, cluster_id, path, pseudo, fs_name):
+ self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
+ cluster_id, '-i', '-'],
+ stdin=json.dumps({
+ "path": path,
+ "pseudo": pseudo,
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "fs_name": fs_name
+ }
+ }))
+
def test_create_and_delete_cluster(self):
'''
Test successful creation and deletion of the nfs cluster.
finally:
self._delete_cluster_with_fs(self.fs_name, mnt_pt)
self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])
+
+ def test_cephfs_export_update_with_nonexistent_dir(self):
+ """
+ Test that invalid path is not allowed while updating a CephFS
+ export.
+ """
+ self._create_cluster_with_fs(self.fs_name)
+ self._create_export(export_id=1)
+
+ try:
+ self.update_export(self.cluster_id, "/not_existent_dir",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOENT:
+ raise
+
+ self._delete_export()
+ self._delete_cluster_with_fs(self.fs_name)
+
+ def test_cephfs_export_update_at_non_dir_path(self):
+ """
+ Test that non-directory path are not allowed while updating a CephFS
+ export.
+ """
+ mnt_pt = '/mnt'
+ preserve_mode = self._sys_cmd(['stat', '-c', '%a', mnt_pt])
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ try:
+ self.ctx.cluster.run(args=['touch', f'{mnt_pt}/testfile'])
+ self._create_export(export_id=1)
+
+ # test at a file path
+ try:
+ self.update_export(self.cluster_id, "/testfile",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOTDIR:
+ raise
+
+ # test at a symlink path
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir'])
+ self.ctx.cluster.run(args=['ln', '-s', f'{mnt_pt}/testdir',
+ f'{mnt_pt}/testdir_symlink'])
+ try:
+ self.update_export(self.cluster_id, "/testdir_symlink",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOTDIR:
+ raise
+
+ # verify the path wasn't changed
+ export = json.loads(self._nfs_cmd("export", "ls",
+ self.cluster_id, "--detailed"))
+ self.assertEqual(export[0]["pseudo"], "/cephfs")
+
+ finally:
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}/*'])
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode)