# Add created obj url to common config obj
ioctx.append(config_obj, GaneshaConfParser.write_block(
self._create_url_block(obj)).encode('utf-8'))
- FSExport._check_rados_notify(ioctx, config_obj)
+ ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug(f"Added {obj} url to {config_obj}")
def update_obj(self, conf_block, obj, config_obj):
ioctx.write_full(obj, conf_block.encode('utf-8'))
log.debug("write configuration into rados object "
f"{self.pool}/{self.namespace}/{obj}:\n{conf_block}")
- FSExport._check_rados_notify(ioctx, config_obj)
+ ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug(f"Update export {obj} in {config_obj}")
def remove_obj(self, obj, config_obj):
export_urls = export_urls.replace(url.encode('utf-8'), b'')
ioctx.remove_object(obj)
ioctx.write_full(config_obj, export_urls)
- FSExport._check_rados_notify(ioctx, config_obj)
+ ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug("Object deleted: {}".format(url))
def remove_all_obj(self):
ValidateExport._client(client)
-class FSExport(object):
- def __init__(self, mgr, namespace=None):
+class ExportMgr:
+ def __init__(self, mgr, namespace=None, export_ls=None):
self.mgr = mgr
self.rados_pool = POOL_NAME
self.rados_namespace = namespace
- self._exports = None
+ self._exports = export_ls
@staticmethod
def _check_rados_notify(ioctx, obj):
except KeyError:
pass
- def _create_user_key(self, entity, path, fs_name, fs_ro):
- osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
- self.rados_pool, self.rados_namespace, fs_name)
- access_type = 'r' if fs_ro else 'rw'
-
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': 'client.{}'.format(entity),
- 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow {} path={}'.format(
- access_type, path)],
- 'format': 'json',
- })
-
- json_res = json.loads(out)
- log.info("Export user created is {}".format(json_res[0]['entity']))
- return json_res[0]['entity'], json_res[0]['key']
-
def _delete_user(self, entity):
self.mgr.check_mon_command({
'prefix': 'auth rm',
except Exception as e:
return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
+ def _fetch_export_obj(self, ex_id):
+ try:
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ ioctx.set_namespace(self.rados_namespace)
+ export = Export.from_export_block(GaneshaConfParser(ioctx.read(f"export-{ex_id}"
+ ).decode("utf-8")).parse()[0], self.rados_namespace)
+ return export
+ except ObjectNotFound:
+ log.exception(f"Export ID: {ex_id} not found")
+
+ def _update_export(self, export):
+ self.exports[self.rados_namespace].append(export)
+ NFSRados(self.mgr, self.rados_namespace).update_obj(
+ GaneshaConfParser.write_block(export.to_export_block()),
+ f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
+
def format_path(self, path):
if path:
path = normpath(path.strip())
return path
@export_cluster_checker
- def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path):
+ def create_export(self, **kwargs):
try:
- if not check_fs(self.mgr, fs_name):
- raise FSNotFound(fs_name)
-
- pseudo_path = self.format_path(pseudo_path)
- ValidateExport.pseudo_path(pseudo_path)
-
- if cluster_id not in self.exports:
- self.exports[cluster_id] = []
-
- if not self._fetch_export(pseudo_path):
- ex_id = self._gen_export_id()
- user_id = f"{cluster_id}{ex_id}"
- user_out, key = self._create_user_key(user_id, path, fs_name, read_only)
- access_type = "RW"
- if read_only:
- access_type = "RO"
- ex_dict = {
- 'path': self.format_path(path),
- 'pseudo': pseudo_path,
- 'cluster_id': cluster_id,
- 'access_type': access_type,
- 'fsal': {"name": "CEPH", "user_id": user_id,
- "fs_name": fs_name, "sec_label_xattr": ""},
- 'clients': []
- }
- export = Export.from_dict(ex_id, ex_dict)
- export.fsal.cephx_key = key
- self._save_export(export)
- result = {
- "bind": pseudo_path,
- "fs": fs_name,
- "path": path,
- "cluster": cluster_id,
- "mode": access_type,
- }
- return (0, json.dumps(result, indent=4), '')
- return 0, "", "Export already exists"
+ fsal_type = kwargs.pop('fsal_type')
+ if fsal_type == 'cephfs':
+ return FSExport(self).create_export(**kwargs)
+ raise NotImplementedError()
except Exception as e:
- return exception_handler(e, f"Failed to create {pseudo_path} export for {cluster_id}")
+ return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
@export_cluster_checker
def delete_export(self, cluster_id, pseudo_path):
except Exception as e:
return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
- def _fetch_export_obj(self, ex_id):
+ def update_export(self, export_config):
try:
- with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- ioctx.set_namespace(self.rados_namespace)
- export = Export.from_export_block(GaneshaConfParser(ioctx.read(f"export-{ex_id}"
- ).decode("utf-8")).parse()[0], self.rados_namespace)
- return export
- except ObjectNotFound:
- log.exception(f"Export ID: {ex_id} not found")
+ if not export_config:
+ raise NFSInvalidOperation("Empty Config!!")
+ new_export = json.loads(export_config)
+ # check export type
+ return FSExport(self).update_export(new_export)
+ except NotImplementedError:
+ return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
+ except Exception as e:
+ return exception_handler(e, f'Failed to update export: {e}')
+
+
+class FSExport(ExportMgr):
+ def __init__(self, export_mgr_obj):
+ super().__init__(export_mgr_obj.mgr, export_mgr_obj.rados_namespace,
+ export_mgr_obj._exports)
def _validate_export(self, new_export_dict):
if new_export_dict['cluster_id'] not in available_clusters(self.mgr):
else:
new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
ValidateExport.pseudo_path(new_export_dict['pseudo'])
- log.debug(f"Pseudo path has changed from {export.pseudo} to "\
+ log.debug(f"Pseudo path has changed from {export.pseudo} to "
f"{new_export_dict['pseudo']}")
# Check if squash changed
if export.squash != new_export_dict['squash']:
log.info(f"Export user updated {user_id}")
- def _update_export(self, export):
- self.exports[self.rados_namespace].append(export)
- NFSRados(self.mgr, self.rados_namespace).update_obj(
- GaneshaConfParser.write_block(export.to_export_block()),
- f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
+ def _create_user_key(self, entity, path, fs_name, fs_ro):
+ osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
+ self.rados_pool, self.rados_namespace, fs_name)
+ access_type = 'r' if fs_ro else 'rw'
- def update_export(self, export_config):
- try:
- if not export_config:
- raise NFSInvalidOperation("Empty Config!!")
- update_export = json.loads(export_config)
- old_export, update_user_caps = self._validate_export(update_export)
- if update_user_caps:
- self._update_user_id(update_export['path'], update_export['access_type'],
- update_export['fsal']['fs_name'], update_export['fsal']['user_id'])
- update_export = Export.from_dict(update_export['export_id'], update_export)
- update_export.fsal.cephx_key = old_export.fsal.cephx_key
- self._update_export(update_export)
- export_ls = self.exports[self.rados_namespace]
- if old_export not in export_ls:
- # This happens when export is fetched by ID
- old_export = self._fetch_export(old_export.pseudo)
- export_ls.remove(old_export)
- restart_nfs_service(self.mgr, update_export.cluster_id)
- return 0, "Successfully updated export", ""
- except NotImplementedError:
- return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
- except Exception as e:
- return exception_handler(e, f'Failed to update export: {e}')
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': 'client.{}'.format(entity),
+ 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow {} path={}'.format(
+ access_type, path)],
+ 'format': 'json',
+ })
+
+ json_res = json.loads(out)
+ log.info("Export user created is {}".format(json_res[0]['entity']))
+ return json_res[0]['entity'], json_res[0]['key']
+
+ def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path):
+ if not check_fs(self.mgr, fs_name):
+ raise FSNotFound(fs_name)
+
+ pseudo_path = self.format_path(pseudo_path)
+ ValidateExport.pseudo_path(pseudo_path)
+
+ if cluster_id not in self.exports:
+ self.exports[cluster_id] = []
+
+ if not self._fetch_export(pseudo_path):
+ ex_id = self._gen_export_id()
+ user_id = f"{cluster_id}{ex_id}"
+ user_out, key = self._create_user_key(user_id, path, fs_name, read_only)
+ access_type = "RW"
+ if read_only:
+ access_type = "RO"
+ ex_dict = {
+ 'path': self.format_path(path),
+ 'pseudo': pseudo_path,
+ 'cluster_id': cluster_id,
+ 'access_type': access_type,
+ 'fsal': {"name": "CEPH", "user_id": user_id,
+ "fs_name": fs_name, "sec_label_xattr": ""},
+ 'clients': []
+ }
+ export = Export.from_dict(ex_id, ex_dict)
+ export.fsal.cephx_key = key
+ self._save_export(export)
+ result = {
+ "bind": pseudo_path,
+ "fs": fs_name,
+ "path": path,
+ "cluster": cluster_id,
+ "mode": access_type,
+ }
+ return (0, json.dumps(result, indent=4), '')
+ return 0, "", "Export already exists"
+
+ def update_export(self, new_export):
+ old_export, update_user_caps = self._validate_export(new_export)
+ if update_user_caps:
+ self._update_user_id(new_export['path'], new_export['access_type'],
+ new_export['fsal']['fs_name'], new_export['fsal']['user_id'])
+ new_export = Export.from_dict(new_export['export_id'], new_export)
+ new_export.fsal.cephx_key = old_export.fsal.cephx_key
+ self._update_export(new_export)
+ export_ls = self.exports[self.rados_namespace]
+ if old_export not in export_ls:
+ # This happens when export is fetched by ID
+ old_export = self._fetch_export(old_export.pseudo)
+ export_ls.remove(old_export)
+ restart_nfs_service(self.mgr, new_export.cluster_id)
+ return 0, "Successfully updated export", ""
class NFSCluster:
from volumes.module import mgr_cmd_wrap
import orchestrator
-from .export.nfs import NFSCluster, FSExport
+from .export.nfs import NFSCluster, ExportMgr
log = logging.getLogger(__name__)
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
with self.lock:
- self.fs_export = FSExport(self)
+ self.export_mgr = ExportMgr(self)
self.nfs = NFSCluster(self)
self.inited = True
@mgr_cmd_wrap
def _cmd_nfs_export_create_cephfs(self, inbuf, cmd):
#TODO Extend export creation for rgw.
- return self.fs_export.create_export(fs_name=cmd['fsname'], cluster_id=cmd['clusterid'],
- pseudo_path=cmd['binding'], read_only=cmd.get('readonly', False), path=cmd.get('path', '/'))
+ return self.export_mgr.create_export(fsal_type='cephfs', fs_name=cmd['fsname'],
+ cluster_id=cmd['clusterid'],
+ pseudo_path=cmd['binding'],
+ read_only=cmd.get('readonly', False),
+ path=cmd.get('path', '/'))
@mgr_cmd_wrap
def _cmd_nfs_export_delete(self, inbuf, cmd):
- return self.fs_export.delete_export(cluster_id=cmd['clusterid'], pseudo_path=cmd['binding'])
+ return self.export_mgr.delete_export(cluster_id=cmd['clusterid'], pseudo_path=cmd['binding'])
@mgr_cmd_wrap
def _cmd_nfs_export_ls(self, inbuf, cmd):
- return self.fs_export.list_exports(cluster_id=cmd['clusterid'], detailed=cmd.get('detailed', False))
+ return self.export_mgr.list_exports(cluster_id=cmd['clusterid'], detailed=cmd.get('detailed', False))
@mgr_cmd_wrap
def _cmd_nfs_export_get(self, inbuf, cmd):
- return self.fs_export.get_export(cluster_id=cmd['clusterid'], pseudo_path=cmd['binding'])
+ return self.export_mgr.get_export(cluster_id=cmd['clusterid'], pseudo_path=cmd['binding'])
@mgr_cmd_wrap
def _cmd_nfs_export_update(self, inbuf, cmd):
# The export <json_file> is passed to -i and it's processing is handled by the Ceph CLI.
- return self.fs_export.update_export(export_config=inbuf)
+ return self.export_mgr.update_export(export_config=inbuf)
@mgr_cmd_wrap
def _cmd_nfs_cluster_create(self, inbuf, cmd):