def export_cluster_checker(func: FuncT) -> FuncT:
- def cluster_check(fs_export: 'ExportMgr', *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+ def cluster_check(
+ fs_export: 'ExportMgr',
+ *args: Any,
+ **kwargs: Any
+ ) -> Tuple[int, str, str]:
"""
This method checks if cluster exists and sets rados namespace.
"""
return cast(FuncT, cluster_check)
-def exception_handler(exception_obj: Exception, log_msg: str = "") -> Tuple[int, str, str]:
+def exception_handler(
+ exception_obj: Exception,
+ log_msg: str = ""
+) -> Tuple[int, str, str]:
if log_msg:
log.exception(log_msg)
return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
class ExportMgr:
- def __init__(self, mgr: 'Module', export_ls: Optional[Dict[str, List[Export]]] = None) -> None:
+ def __init__(
+ self,
+ mgr: 'Module',
+ export_ls: Optional[Dict[str, List[Export]]] = None
+ ) -> None:
self.mgr = mgr
self.rados_pool = POOL_NAME
self._exports: Optional[Dict[str, List[Export]]] = export_ls
log.info(f"Exports parsed successfully {self.exports.items()}")
return self._exports
- def _fetch_export(self, cluster_id: str, pseudo_path: Optional[str]) -> Optional[Export]:
+ def _fetch_export(
+ self,
+ cluster_id: str,
+ pseudo_path: Optional[str]
+ ) -> Optional[Export]:
try:
for ex in self.exports[cluster_id]:
if ex.pseudo == pseudo_path:
f'conf-nfs.{export.cluster_id}'
)
- def _delete_export(self, cluster_id: str, pseudo_path: Optional[str], export_obj: Optional[Export] = None) -> Tuple[int, str, str]:
+ def _delete_export(
+ self,
+ cluster_id: str,
+ pseudo_path: Optional[str],
+ export_obj: Optional[Export] = None
+ ) -> Tuple[int, str, str]:
try:
if export_obj:
export: Optional[Export] = export_obj
try:
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(cluster_id)
- export = Export.from_export_block(GaneshaConfParser(ioctx.read(f"export-{ex_id}"
- ).decode("utf-8")).parse()[0], cluster_id)
+ export = Export.from_export_block(
+ GaneshaConfParser(
+ ioctx.read(f"export-{ex_id}").decode("utf-8")
+ ).parse()[0],
+ cluster_id
+ )
return export
except ObjectNotFound:
log.exception(f"Export ID: {ex_id} not found")
return exception_handler(e, f"Failed to list exports for {cluster_id}")
@export_cluster_checker
- def get_export(self, cluster_id: str, pseudo_path: Optional[str]) -> Tuple[int, str, str]:
+ def get_export(
+ self,
+ cluster_id: str,
+ pseudo_path: Optional[str]
+ ) -> Tuple[int, str, str]:
try:
export = self._fetch_export(cluster_id, pseudo_path)
if export:
super().__init__(export_mgr_obj.mgr,
export_mgr_obj._exports)
- def _update_user_id(self, cluster_id: str, path: str, access_type: str, fs_name: str, user_id: str) -> None:
+ def _update_user_id(
+ self,
+ cluster_id: str,
+ path: str,
+ access_type: str,
+ fs_name: str,
+ user_id: str
+ ) -> None:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
self.rados_pool, cluster_id, fs_name)
access_type = 'r' if access_type == 'RO' else 'rw'
log.info(f"Export user updated {user_id}")
- def _create_user_key(self, cluster_id: str, entity: str, path: str, fs_name: str, fs_ro: bool) -> Tuple[str, str]:
+ def _create_user_key(
+ self,
+ cluster_id: str,
+ entity: str,
+ path: str,
+ fs_name: str,
+ fs_ro: bool
+ ) -> Tuple[str, str]:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
self.rados_pool, cluster_id, fs_name)
access_type = 'r' if fs_ro else 'rw'
if not self._fetch_export(cluster_id, pseudo_path):
ex_id = self._gen_export_id(cluster_id)
user_id = f"nfs.{cluster_id}.{ex_id}"
- user_out, key = self._create_user_key(cluster_id, user_id, path, fs_name, read_only)
+ user_out, key = self._create_user_key(
+ cluster_id, user_id, path, fs_name, read_only
+ )
if clients:
access_type = "none"
elif read_only:
return (0, json.dumps(result, indent=4), '')
return 0, "", "Export already exists"
- def update_export_1(self, cluster_id: str, new_export: Dict, can_create: bool) -> Tuple[int, str, str]:
-
+ def update_export_1(
+ self,
+ cluster_id: str,
+ new_export: Dict,
+ can_create: bool
+ ) -> Tuple[int, str, str]:
for k in ['cluster_id', 'path', 'pseudo']:
if k not in new_export:
raise NFSInvalidOperation(f'Export missing required field {k}')