log.info("Export user created is {}".format(json_res[0]['entity']))
return json_res[0]['entity'], json_res[0]['key']
- def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path, squash):
+ def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path, squash,
+ clients=[]):
if not check_fs(self.mgr, fs_name):
raise FSNotFound(fs_name)
ex_id = self._gen_export_id()
user_id = f"{cluster_id}{ex_id}"
user_out, key = self._create_user_key(user_id, path, fs_name, read_only)
- access_type = "RW"
- if read_only:
+ if clients:
+ access_type = "none"
+ elif read_only:
access_type = "RO"
+ else:
+ access_type = "RW"
ex_dict = {
'path': self.format_path(path),
'pseudo': pseudo_path,
'squash': squash,
'fsal': {"name": "CEPH", "user_id": user_id,
"fs_name": fs_name, "sec_label_xattr": ""},
- 'clients': []
+ 'clients': clients
}
export = Export.from_dict(ex_id, ex_dict)
export.fsal.cephx_key = key
fsname: str,
clusterid: str,
binding: str,
- path: str = '/',
- readonly: bool = False,
+ path: Optional[str] = '/',
+ readonly: Optional[bool] = False,
+ addr: Optional[str] = None,
squash: str = 'none',
) -> Tuple[int, str, str]:
"""Create a cephfs export"""
# TODO Extend export creation for rgw.
+ clients = []
+ if addr:
+ clients = [{
+ 'addresses': [addr],
+ 'access_type': 'ro' if readonly else 'rw',
+ 'squash': squash,
+ }]
+ squash = 'none'
return self.export_mgr.create_export(fsal_type='cephfs', fs_name=fsname,
cluster_id=clusterid, pseudo_path=binding,
read_only=readonly, path=path,
- squash=squash)
+ squash=squash, clients=clients)
@CLICommand('nfs export rm', perm='rw')
def _cmd_nfs_export_rm(self, clusterid: str, binding: str) -> Tuple[int, str, str]: