import json
import logging
+try:
+ from typing import Dict, List, Optional
+except ImportError:
+ pass
import cephfs
import orchestrator
self.sec_label_xattr = sec_label_xattr
self.cephx_key = cephx_key
- @classmethod
- def validate_path(cls, path):
- return re.match(r'^/[^><|&()?]*$', path)
-
- def create_path(self, path):
- cfs = CephFS(self.fs_name)
- cfs.mk_dirs(path)
-
@classmethod
def from_fsal_block(cls, fsal_block):
return cls(fsal_block['name'],
return nid
def _persist_daemon_configuration(self):
- daemon_map = {}
+ daemon_map = {} # type: Dict[str, List[Dict[str, str]]]
"""
for daemon_id in self.list_daemons():
daemon_map[daemon_id] = []
self.pool_name = 'nfs-ganesha'
self.pool_ns = cluster_id
self.mgr = mgr
- self.ganeshaconf = ''
+ self.ganeshaconf = None # type: Optional[GaneshaConf]
self.key = ''
def update_user_caps(self):
self.pool_name, self.pool_ns, nodeid, result)
def create_instance(self):
+ assert self.ganeshaconf is not None
self.ganeshaconf = GaneshaConf(self)
def create_export(self):
+ assert self.ganeshaconf is not None
ex_id = self.ganeshaconf.create_export({
'path': "/",
'pseudo': "/cephfs",
return 0, "", "Export Created Successfully"
def delete_export(self, ex_id):
+ assert self.ganeshaconf is not None
if not self.ganeshaconf.has_export(ex_id):
return 0, "No exports available",""
log.info("Export detected for id:{}".format(ex_id))
import errno
import json
+from typing import Optional
+
from mgr_module import MgrModule
import orchestrator
from .fs.volume import VolumeClient
-#from .fs.nfs import check_fsal_valid, create_instance, create_export, delete_export
from .fs.nfs import NFSConfig
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.vc = VolumeClient(self)
- self.nfs_obj = ""
+ self.nfs_obj = None # type: Optional[NFSConfig]
def __del__(self):
self.vc.shutdown()