elif isinstance(export.fsal, RGWFSAL):
rgwfsal = cast(RGWFSAL, export.fsal)
- ret, out, err = self.mgr.tool_exec(
- ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
- )
- if ret:
- raise NFSException(f'Failed to fetch owner for bucket {export.path}')
- j = json.loads(out)
- owner = j.get('owner', '')
- rgwfsal.user_id = owner
+ if not rgwfsal.user_id:
+ assert export.path
+ ret, out, err = self.mgr.tool_exec(
+ ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
+ )
+ if ret:
+ raise NFSException(f'Failed to fetch owner for bucket {export.path}')
+ j = json.loads(out)
+ owner = j.get('owner', '')
+ rgwfsal.user_id = owner
+ assert rgwfsal.user_id
ret, out, err = self.mgr.tool_exec([
- 'radosgw-admin', 'user', 'info', '--uid', owner
+ 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
])
if ret:
raise NFSException(
- f'Failed to fetch key for bucket {export.path} owner {owner}'
+ f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
)
j = json.loads(out)
raise NFSInvalidOperation("export must specify pseudo path")
path = ex_dict.get("path")
- if not path:
+ if path is None:
raise NFSInvalidOperation("export must specify path")
path = self.format_path(path)
fsal = ex_dict.get("fsal", {})
fsal_type = fsal.get("name")
if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
- if '/' in path:
- raise NFSInvalidOperation('"/" is not allowed in path (bucket name)')
- uid = f'nfs.{cluster_id}.{path}'
- if "user_id" in fsal and fsal["user_id"] != uid:
- raise NFSInvalidOperation(f"export FSAL user_id must be '{uid}'")
+ if '/' in path and path != '/':
+ raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
fs_name = fsal.get("fs_name")
if not fs_name:
return 0, "", "Export already exists"
def create_rgw_export(self,
- bucket: str,
cluster_id: str,
pseudo_path: str,
access_type: str,
read_only: bool,
squash: str,
+ bucket: Optional[str] = None,
+ user_id: Optional[str] = None,
clients: list = []) -> Tuple[int, str, str]:
pseudo_path = self.format_path(pseudo_path)
+ if not bucket and not user_id:
+ return -errno.EINVAL, "", "Must specify either bucket or user_id"
+
if not self._fetch_export(cluster_id, pseudo_path):
export = self.create_export_from_dict(
cluster_id,
self._gen_export_id(cluster_id),
{
"pseudo": pseudo_path,
- "path": bucket,
+ "path": bucket or '/',
"access_type": access_type,
"squash": squash,
- "fsal": {"name": NFS_GANESHA_SUPPORTED_FSALS[1]},
+ "fsal": {
+ "name": NFS_GANESHA_SUPPORTED_FSALS[1],
+ "user_id": user_id,
+ },
"clients": clients,
}
)
] if enable else []
def mock_exec(cls, args):
+ if args[1:3] == ['bucket', 'stats']:
+ bucket_info = {
+ "owner": "bucket_owner_user",
+ }
+ return 0, json.dumps(bucket_info), ''
u = {
"user_id": "abc",
"display_name": "foo",
assert len(exports) == 1
assert exports[0].export_id == 1
- def test_create_export_rgw(self):
- self._do_mock_test(self._do_test_create_export_rgw)
+ def test_create_export_rgw_bucket(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket)
+
+ def _do_test_create_export_rgw_bucket(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ bucket='bucket',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "bucket_owner_user"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_bucket_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket_user)
- def _do_test_create_export_rgw(self):
+ def _do_test_create_export_rgw_bucket_user(self):
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
fsal_type='rgw',
cluster_id=self.cluster_id,
bucket='bucket',
+ user_id='other_user',
pseudo_path='/mybucket',
read_only=False,
squash='root',
assert export.fsal.secret_access_key == "the_secret_key"
assert len(export.clients) == 1
assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "other_user"
assert export.clients[0].access_type == 'rw'
assert export.clients[0].addresses == ["192.168.0.0/16"]
assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_user)
+
+ def _do_test_create_export_rgw_user(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ user_id='some_user',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "some_user"
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
def test_create_export_cephfs(self):
self._do_mock_test(self._do_test_create_export_cephfs)