:param check_in: Check specified export id
'''
output = self._cmd('auth', 'ls')
+ client_id = f'client.nfs.{self.cluster_id}'
if check_in:
- self.assertIn(f'client.nfs.{self.cluster_id}.{export_id}', output)
+ self.assertIn(f'{cient_id}.{export_id}', output)
else:
- self.assertNotIn(f'client.nfs.{self.cluster_id}.{export_id}', output)
+ self.assertNotIn(client_id, output)
def _test_idempotency(self, cmd_func, cmd_args):
'''
self.sample_export['export_id'] = 2
self.sample_export['pseudo'] = self.pseudo_path + '1'
self.sample_export['access_type'] = 'RO'
- self.sample_export['fsal']['user_id'] = f'nfs.{self.cluster_id}.2'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.2'
self.assertDictEqual(self.sample_export, nfs_output[1])
# Export-3 for subvolume with r only
self.sample_export['export_id'] = 3
self.sample_export['path'] = sub_vol_path
self.sample_export['pseudo'] = self.pseudo_path + '2'
- self.sample_export['fsal']['user_id'] = f'nfs.{self.cluster_id}.3'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.3'
self.assertDictEqual(self.sample_export, nfs_output[2])
# Export-4 for subvolume
self.sample_export['export_id'] = 4
self.sample_export['pseudo'] = self.pseudo_path + '3'
self.sample_export['access_type'] = 'RW'
- self.sample_export['fsal']['user_id'] = f'nfs.{self.cluster_id}.4'
+ self.sample_export['fsal']['user_id'] = f'{self.expected_name}.4'
self.assertDictEqual(self.sample_export, nfs_output[3])
def _get_export(self):
def export_cluster_checker(func: FuncT) -> FuncT:
def cluster_check(
- fs_export: 'ExportMgr',
+ export: 'ExportMgr',
*args: Any,
**kwargs: Any
) -> Tuple[int, str, str]:
"""
This method checks if cluster exists
"""
- if kwargs['cluster_id'] not in available_clusters(fs_export.mgr):
+ if kwargs['cluster_id'] not in available_clusters(export.mgr):
return -errno.ENOENT, "", "Cluster does not exists"
- return func(fs_export, *args, **kwargs)
+ return func(export, *args, **kwargs)
return cast(FuncT, cluster_check)
def _fetch_export(
self,
cluster_id: str,
- pseudo_path: Optional[str]
+ pseudo_path: str
) -> Optional[Export]:
try:
for ex in self.exports[cluster_id]:
'prefix': 'auth rm',
'entity': 'client.{}'.format(entity),
})
- log.info(f"Export user deleted is {entity}")
+ log.info(f"Deleted export user {entity}")
+
+ def _delete_rgw_user(self, uid: str) -> None:
+ self._exec(['radosgw-admin', 'user', 'rm', '--uid', uid])
+ log.info(f"Deleted export RGW user {uid}")
def _gen_export_id(self, cluster_id: str) -> int:
exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
if export_obj:
export: Optional[Export] = export_obj
else:
+ assert pseudo_path
export = self._fetch_export(cluster_id, pseudo_path)
if export:
self._delete_user(export.fsal.user_id)
if isinstance(export.fsal, RGWFSAL):
assert export.fsal.user_id
- uid = f'nfs.{cluster_id}.{export.path}'
- self._exec(['radosgw-admin', 'user', 'rm', '--uid', uid])
+ self._delete_rgw_user(f'nfs.{cluster_id}.{export.path}')
if not self.exports[cluster_id]:
del self.exports[cluster_id]
return 0, "Successfully deleted export", ""
@export_cluster_checker
def delete_export(self,
- cluster_id: str = '',
- pseudo_path: Optional[str] = None) -> Tuple[int, str, str]:
- assert pseudo_path is not None
+ cluster_id: str,
+ pseudo_path: str) -> Tuple[int, str, str]:
return self._delete_export(cluster_id, pseudo_path)
def delete_all_exports(self, cluster_id: str) -> None:
@export_cluster_checker
def list_exports(self,
- cluster_id: str = '',
+ cluster_id: str,
detailed: bool = False) -> Tuple[int, str, str]:
try:
if detailed:
@export_cluster_checker
def get_export(
self,
- cluster_id: str = '',
- pseudo_path: Optional[str] = None
+ cluster_id: str,
+ pseudo_path: str,
) -> Tuple[int, str, str]:
try:
export = self._fetch_export(cluster_id, pseudo_path)
"path": bucket,
"cluster": cluster_id,
"mode": access_type,
+ "squash": squash,
}
return (0, json.dumps(result, indent=4), '')
return 0, "", "Export already exists"
def update_export_1(
self,
cluster_id: str,
- new_export: Dict,
+ new_export_dict: Dict,
) -> Tuple[int, str, str]:
for k in ['cluster_id', 'path', 'pseudo']:
- if k not in new_export:
+ if k not in new_export_dict:
raise NFSInvalidOperation(f'Export missing required field {k}')
- if new_export['cluster_id'] not in available_clusters(self.mgr):
+ if new_export_dict['cluster_id'] not in available_clusters(self.mgr):
raise ClusterNotFound()
- new_export['path'] = self.format_path(new_export['path'])
- new_export['pseudo'] = self.format_path(new_export['pseudo'])
+ new_export_dict['path'] = self.format_path(new_export_dict['path'])
+ new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
- old_export = self._fetch_export(new_export['cluster_id'],
- new_export['pseudo'])
+ old_export = self._fetch_export(new_export_dict['cluster_id'],
+ new_export_dict['pseudo'])
if old_export:
# Check if export id matches
- if new_export.get('export_id'):
- if old_export.export_id != new_export.get('export_id'):
+ if new_export_dict.get('export_id'):
+ if old_export.export_id != new_export_dict.get('export_id'):
raise NFSInvalidOperation('Export ID changed, Cannot update export')
else:
- new_export['export_id'] = old_export.export_id
- elif new_export.get('export_id'):
- old_export = self._fetch_export_obj(cluster_id, new_export['export_id'])
+ new_export_dict['export_id'] = old_export.export_id
+ elif new_export_dict.get('export_id'):
+ old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
if old_export:
# re-fetch via old pseudo
old_export = self._fetch_export(cluster_id, old_export.pseudo)
self.mgr.log.debug(f"export {old_export.export_id} pseudo {old_export.pseudo} -> {new_export_dict['pseudo']}")
new_export = Export.from_dict(
- new_export.get('export_id', self._gen_export_id(cluster_id)),
- new_export
+ new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
+ new_export_dict
)
new_export.validate(self.mgr)
if old_export.fsal.name != new_export.fsal.name:
raise NFSInvalidOperation('FSAL change not allowed')
+ if old_export.pseudo != new_export.pseudo:
+ self.mgr.log.debug(
+ f'export {new_export.export_id} pseudo {old_export.pseudo} -> {new_export.pseudo}'
+ )
if old_export.fsal.name == 'CEPH':
old_fsal = cast(CephFSFSAL, old_export.fsal)