import cephfs
import cherrypy
+from .. import mgr
from ..security import Scope
from ..services.cephfs import CephFS
-from ..services.cephx import CephX
from ..services.exception import DashboardException, serialize_dashboard_exception
-from ..services.ganesha import Ganesha, GaneshaConf, NFSException
from ..services.rgw_client import NoCredentialsException, \
NoRgwDaemonsException, RequestException, RgwClient
from . import APIDoc, APIRouter, BaseController, Endpoint, EndpointDoc, \
ReadPermission, RESTController, Task, UIRouter
-logger = logging.getLogger('controllers.ganesha')
+logger = logging.getLogger('controllers.nfs')
+
+
+class NFSException(DashboardException):
+ def __init__(self, msg):
+ super(NFSException, self).__init__(component="nfs", msg=msg)
# documentation helpers
def status(self):
status = {'available': True, 'message': None}
try:
- Ganesha.get_ganesha_clusters()
- except NFSException as e:
+ mgr.remote('nfs', 'is_active')
+ except (NameError, ImportError) as e:
status['message'] = str(e) # type: ignore
status['available'] = False
@EndpointDoc("List all NFS-Ganesha exports",
responses={200: [EXPORT_SCHEMA]})
def list(self):
- result = []
- for cluster_id in Ganesha.get_ganesha_clusters():
- result.extend(
- [export.to_dict()
- for export in GaneshaConf.instance(cluster_id).list_exports()])
- return result
+ return mgr.remote('nfs', 'export_ls')
@NfsTask('create', {'path': '{path}', 'fsal': '{fsal.name}',
'cluster_id': '{cluster_id}'}, 2.0)
def create(self, path, cluster_id, daemons, pseudo, tag, access_type,
squash, security_label, protocols, transports, fsal, clients,
reload_daemons=True):
- if fsal['name'] not in Ganesha.fsals_available():
+ if fsal['name'] not in mgr.remote('nfs', 'cluster_fsals'):
raise NFSException("Cannot create this export. "
"FSAL '{}' cannot be managed by the dashboard."
.format(fsal['name']))
- ganesha_conf = GaneshaConf.instance(cluster_id)
- ex_id = ganesha_conf.create_export({
+ fsal.pop('user_id') # mgr/nfs does not let you customize user_id
+ # FIXME: what was this? 'tag': tag,
+ raw_ex = {
'path': path,
'pseudo': pseudo,
'cluster_id': cluster_id,
'daemons': daemons,
- 'tag': tag,
'access_type': access_type,
'squash': squash,
'security_label': security_label,
'transports': transports,
'fsal': fsal,
'clients': clients
- })
- if reload_daemons:
- ganesha_conf.reload_daemons(daemons)
- return ganesha_conf.get_export(ex_id).to_dict()
+ }
+ export = mgr.remote('nfs', 'export_apply', cluster_id, raw_ex)
+ return export
@EndpointDoc("Get an NFS-Ganesha export",
parameters={
},
responses={200: EXPORT_SCHEMA})
def get(self, cluster_id, export_id):
- export_id = int(export_id)
- ganesha_conf = GaneshaConf.instance(cluster_id)
- if not ganesha_conf.has_export(export_id):
- raise cherrypy.HTTPError(404)
- return ganesha_conf.get_export(export_id).to_dict()
+ return mgr.remote('nfs', 'export_get', cluster_id, export_id)
@NfsTask('edit', {'cluster_id': '{cluster_id}', 'export_id': '{export_id}'},
2.0)
squash, security_label, protocols, transports, fsal, clients,
reload_daemons=True):
export_id = int(export_id)
- ganesha_conf = GaneshaConf.instance(cluster_id)
- if not ganesha_conf.has_export(export_id):
+ if not mgr.remote('nfs', 'export_get', export_id):
raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
- if fsal['name'] not in Ganesha.fsals_available():
+ if fsal['name'] not in mgr.remote('nfs', 'cluster_fsals'):
raise NFSException("Cannot make modifications to this export. "
"FSAL '{}' cannot be managed by the dashboard."
.format(fsal['name']))
- old_export = ganesha_conf.update_export({
- 'export_id': export_id,
+ fsal.pop('user_id') # mgr/nfs does not let you customize user_id
+ # FIXME: what was this? 'tag': tag,
+ raw_ex = {
'path': path,
+ 'pseudo': pseudo,
'cluster_id': cluster_id,
'daemons': daemons,
- 'pseudo': pseudo,
- 'tag': tag,
'access_type': access_type,
'squash': squash,
'security_label': security_label,
'transports': transports,
'fsal': fsal,
'clients': clients
- })
- daemons = list(daemons)
- for d_id in old_export.daemons:
- if d_id not in daemons:
- daemons.append(d_id)
- if reload_daemons:
- ganesha_conf.reload_daemons(daemons)
- return ganesha_conf.get_export(export_id).to_dict()
+ }
+ export = mgr.remote('nfs', 'export_apply', cluster_id, raw_ex)
+ return export
@NfsTask('delete', {'cluster_id': '{cluster_id}',
'export_id': '{export_id}'}, 2.0)
})
def delete(self, cluster_id, export_id, reload_daemons=True):
export_id = int(export_id)
- ganesha_conf = GaneshaConf.instance(cluster_id)
- if not ganesha_conf.has_export(export_id):
+ export = mgr.remote('nfs', 'export_get', cluster_id, export_id)
+ if not export:
raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
- export = ganesha_conf.remove_export(export_id)
- if reload_daemons:
- ganesha_conf.reload_daemons(export.daemons)
+ mgr.remote('nfs', 'export_rm', cluster_id, export['pseudo'])
@APIRouter('/nfs-ganesha/daemon', Scope.NFS_GANESHA)
responses={200: [{
'daemon_id': (str, 'Daemon identifier'),
'cluster_id': (str, 'Cluster identifier'),
- 'cluster_type': (str, 'Cluster type'),
+ 'cluster_type': (str, 'Cluster type'), # FIXME: remove this property
'status': (int, 'Status of daemon', True),
'desc': (str, 'Status description', True)
}]})
def list(self):
- result = []
- for cluster_id in Ganesha.get_ganesha_clusters():
- result.extend(GaneshaConf.instance(cluster_id).list_daemons())
- return result
+ # FIXME: remove this; dashboard should only care about clusters.
+ return mgr.remote('nfs', 'daemon_ls')
@UIRouter('/nfs-ganesha', Scope.NFS_GANESHA)
@Endpoint('GET', '/cephx/clients')
@ReadPermission
def cephx_clients(self):
- return list(CephX.list_clients())
+ # FIXME: remove this; cephx users/creds are managed by mgr/nfs
+ return ['admin']
@Endpoint('GET', '/fsals')
@ReadPermission
def fsals(self):
- return Ganesha.fsals_available()
+ return mgr.remote('nfs', 'cluster_fsals')
@Endpoint('GET', '/lsdir')
@ReadPermission
@Endpoint('GET', '/clusters')
@ReadPermission
def clusters(self):
- return Ganesha.get_ganesha_clusters()
+ return mgr.remote('nfs', 'cluster_ls')
+++ /dev/null
-# -*- coding: utf-8 -*-
-# pylint: disable=too-many-lines
-
-import logging
-import os
-import re
-from typing import Any, Dict, List, Optional, cast
-
-from ceph.deployment.service_spec import NFSServiceSpec
-from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
-
-from .. import mgr
-from ..exceptions import DashboardException
-from ..settings import Settings
-from .cephfs import CephFS
-from .cephx import CephX
-from .orchestrator import OrchClient
-from .rgw_client import NoCredentialsException, NoRgwDaemonsException, RequestException, RgwClient
-
-logger = logging.getLogger('ganesha')
-
-
-class NFSException(DashboardException):
- def __init__(self, msg):
- super(NFSException, self).__init__(component="nfs", msg=msg)
-
-
-class Ganesha(object):
- @classmethod
- def _get_clusters_locations(cls):
- # pylint: disable=too-many-branches
- # Get Orchestrator clusters
- orch_result = cls._get_orch_clusters_locations()
-
- # Get user-defined clusters
- location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
- if not orch_result and not location_list_str:
- raise NFSException("NFS-Ganesha cluster is not detected. "
- "Please set the GANESHA_RADOS_POOL_NAMESPACE "
- "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
- result = {} # type: ignore
- location_list = [loc.strip() for loc in location_list_str.split(
- ",")] if location_list_str else []
- for location in location_list:
- if not location:
- raise NFSException("Invalid Ganesha cluster RADOS "
- "[cluster_id:]pool/namespace setting: {}"
- .format(location))
- if location.count(':') < 1:
- # default cluster_id
- if location.count('/') > 1:
- raise NFSException("Invalid Ganesha RADOS pool/namespace "
- "setting: {}".format(location))
- # in this case accept pool/namespace only
- cluster = "_default_"
- if location.count('/') == 0:
- pool, namespace = location, None
- else:
- pool, namespace = location.split('/', 1)
- else:
- cluster = location[:location.find(':')]
- pool_nm = location[location.find(':')+1:]
- if pool_nm.count('/') == 0:
- pool, namespace = pool_nm, None
- else:
- pool, namespace = pool_nm.split('/', 1)
-
- # Check pool/namespace collision.
- for clusters in [orch_result, result]:
- for cluster_name, cluster_data in clusters.items():
- if cluster_data['pool'] == pool and cluster_data['namespace'] == namespace:
- raise NFSException(
- f'Pool `{pool}` and namespace `{namespace}` are already in use by '
- f"""NFS-Ganesha cluster called `{cluster_name}`{" that is deployed by "
- "the Orchestrator" if cluster_data['type'] == ClusterType.ORCHESTRATOR
- else ''}. """
- 'Please update GANESHA_RADOS_POOL_NAMESPACE setting.'
- )
-
- if cluster in orch_result:
- # cephadm might have set same cluster settings, ask the user to remove it.
- raise NFSException(
- 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
- 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
- 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
- 'setting.'.format(cluster))
-
- if cluster in result:
- raise NFSException("Duplicate Ganesha cluster definition in "
- "the setting: {}".format(location_list_str))
- result[cluster] = {
- 'pool': pool,
- 'namespace': namespace,
- 'type': ClusterType.USER,
- 'daemon_conf': None
- }
- return {**orch_result, **result}
-
- @classmethod
- def _get_orch_clusters_locations(cls):
- orch_result = {} # type: ignore
- services = cls._get_orch_nfs_services()
- for service in services:
- spec = cast(NFSServiceSpec, service.spec)
- try:
- orch_result[spec.service_id] = {
- 'pool': 'nfs-ganesha',
- 'namespace': spec.service_id,
- 'type': ClusterType.ORCHESTRATOR,
- 'daemon_conf': spec.rados_config_name()
- }
- except AttributeError as ex:
- logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
- continue
- return orch_result
-
- @classmethod
- def get_ganesha_clusters(cls):
- return list(cls._get_clusters_locations())
-
- @staticmethod
- def _get_orch_nfs_services() -> List[ServiceDescription]:
- try:
- return OrchClient.instance().services.list('nfs')
- except (RuntimeError, OrchestratorError, ImportError):
- return []
-
- @classmethod
- def parse_rados_url(cls, rados_url):
- if not rados_url.startswith("rados://"):
- raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}"
- .format(rados_url))
- rados_url = rados_url[8:]
- url_comps = rados_url.split("/")
- if len(url_comps) < 2 or len(url_comps) > 3:
- raise NFSException("Invalid NFS Ganesha RADOS configuration URL: "
- "rados://{}".format(rados_url))
- if len(url_comps) == 2:
- return url_comps[0], None, url_comps[1]
- return url_comps
-
- @classmethod
- def make_rados_url(cls, pool, namespace, obj):
- if namespace:
- return "rados://{}/{}/{}".format(pool, namespace, obj)
- return "rados://{}/{}".format(pool, obj)
-
- @classmethod
- def get_cluster(cls, cluster_id):
- locations = cls._get_clusters_locations()
- if cluster_id not in locations:
- raise NFSException("Cluster not found: cluster_id={}"
- .format(cluster_id))
- return locations[cluster_id]
-
- @classmethod
- def fsals_available(cls):
- result = []
- if CephFS.list_filesystems():
- result.append("CEPH")
- try:
- if RgwClient.admin_instance().is_service_online() and \
- RgwClient.admin_instance().is_system_user():
- result.append("RGW")
- except (DashboardException, NoCredentialsException, RequestException,
- NoRgwDaemonsException):
- pass
- return result
-
-
-class GaneshaConfParser(object):
- def __init__(self, raw_config):
- self.pos = 0
- self.text = ""
- self.clean_config(raw_config)
-
- def clean_config(self, raw_config):
- for line in raw_config.split("\n"):
- cardinal_idx = line.find('#')
- if cardinal_idx == -1:
- self.text += line
- else:
- # remove comments
- self.text += line[:cardinal_idx]
- if line.startswith("%"):
- self.text += "\n"
-
- def remove_all_whitespaces(self):
- new_text = ""
- in_string = False
- in_section = False
- for i, cha in enumerate(self.text):
- if in_section:
- if cha != '"' and self.text[i-1] != '\\':
- new_text += cha
- elif cha == '\n':
- new_text += cha
- in_section = False
- elif i == (len(self.text)-1):
- if cha != '"' and self.text[i-1] != '\\':
- new_text += cha
- in_section = False
- elif not in_section and (i == 0 or self.text[i-1] == '\n') and cha == '%':
- in_section = True
- new_text += cha
- elif in_string or cha not in [' ', '\n', '\t']:
- new_text += cha
- elif cha == '"' and self.text[i-1] != '\\':
- in_string = not in_string
- self.text = new_text
-
- def stream(self):
- return self.text[self.pos:]
-
- def parse_block_name(self):
- idx = self.stream().find('{')
- if idx == -1:
- raise Exception("Cannot find block name")
- block_name = self.stream()[:idx]
- self.pos += idx+1
- return block_name
-
- def parse_block_or_section(self):
- if self.stream().startswith("%url"):
- # section line
- self.pos += self.stream().find('rados://')
- idx = self.stream().find('\n')
- if idx == -1:
- value = self.stream()
- self.pos += len(self.stream())
- else:
- value = self.stream()[:idx]
- self.pos += idx+1
- block_dict = {'block_name': '%url', 'value': value}
- return block_dict
-
- block_name = self.parse_block_name().upper()
- block_dict = {'block_name': block_name}
- self.parse_block_body(block_dict)
- if self.stream()[0] != '}':
- raise Exception("No closing bracket '}' found at the end of block")
- self.pos += 1
- return block_dict
-
- def parse_parameter_value(self, raw_value):
- colon_idx = raw_value.find(',')
-
- if colon_idx == -1:
- try:
- return int(raw_value)
- except ValueError:
- if raw_value == "true":
- return True
- if raw_value == "false":
- return False
- if raw_value.find('"') == 0:
- return raw_value[1:-1]
- return raw_value
- else:
- return [self.parse_parameter_value(v.strip())
- for v in raw_value.split(',')]
-
- def parse_stanza(self, block_dict):
- equal_idx = self.stream().find('=')
- semicolon_idx = self.stream().find(';')
- if equal_idx == -1:
- raise Exception("Malformed stanza: no equal symbol found.")
- parameter_name = self.stream()[:equal_idx].lower()
- parameter_value = self.stream()[equal_idx+1:semicolon_idx]
- block_dict[parameter_name] = self.parse_parameter_value(
- parameter_value)
- self.pos += semicolon_idx+1
-
- def parse_block_body(self, block_dict):
- last_pos = self.pos
- while True:
- semicolon_idx = self.stream().find(';')
- lbracket_idx = self.stream().find('{')
- rbracket_idx = self.stream().find('}')
-
- if rbracket_idx == 0:
- # block end
- return
-
- if (semicolon_idx != -1 and lbracket_idx != -1
- and semicolon_idx < lbracket_idx) \
- or (semicolon_idx != -1 and lbracket_idx == -1):
- self.parse_stanza(block_dict)
- elif (semicolon_idx != -1 and lbracket_idx != -1
- and semicolon_idx > lbracket_idx) or (
- semicolon_idx == -1 and lbracket_idx != -1):
- if '_blocks_' not in block_dict:
- block_dict['_blocks_'] = []
- block_dict['_blocks_'].append(self.parse_block_or_section())
- else:
- raise Exception("Malformed stanza: no semicolon found.")
-
- if last_pos == self.pos:
- raise Exception("Infinite loop while parsing block content")
- last_pos = self.pos
-
- def parse(self):
- self.remove_all_whitespaces()
- blocks = []
- while self.stream():
- block_dict = self.parse_block_or_section()
- blocks.append(block_dict)
- return blocks
-
- @staticmethod
- def _indentation(depth, size=4):
- conf_str = ""
- for _ in range(0, depth*size):
- conf_str += " "
- return conf_str
-
- @staticmethod
- def write_block_body(block, depth=0):
- def format_val(key, val):
- if isinstance(val, list):
- return ', '.join([format_val(key, v) for v in val])
- if isinstance(val, bool):
- return str(val).lower()
- if isinstance(val, int) or (block['block_name'] == 'CLIENT'
- and key == 'clients'):
- return '{}'.format(val)
- return '"{}"'.format(val)
-
- conf_str = ""
- for key, val in block.items():
- if key == 'block_name':
- continue
- if key == '_blocks_':
- for blo in val:
- conf_str += GaneshaConfParser.write_block(blo, depth)
- elif val:
- conf_str += GaneshaConfParser._indentation(depth)
- conf_str += '{} = {};\n'.format(key, format_val(key, val))
- return conf_str
-
- @staticmethod
- def write_block(block, depth):
- if block['block_name'] == "%url":
- return '%url "{}"\n\n'.format(block['value'])
-
- conf_str = ""
- conf_str += GaneshaConfParser._indentation(depth)
- conf_str += format(block['block_name'])
- conf_str += " {\n"
- conf_str += GaneshaConfParser.write_block_body(block, depth+1)
- conf_str += GaneshaConfParser._indentation(depth)
- conf_str += "}\n\n"
- return conf_str
-
- @staticmethod
- def write_conf(blocks):
- if not isinstance(blocks, list):
- blocks = [blocks]
- conf_str = ""
- for block in blocks:
- conf_str += GaneshaConfParser.write_block(block, 0)
- return conf_str
-
-
-class FSal(object):
- def __init__(self, name):
- self.name = name
-
- @classmethod
- def validate_path(cls, _):
- raise NotImplementedError()
-
- def validate(self):
- raise NotImplementedError()
-
- def fill_keys(self):
- raise NotImplementedError()
-
- def create_path(self, path):
- raise NotImplementedError()
-
- @staticmethod
- def from_fsal_block(fsal_block):
- if fsal_block['name'] == "CEPH":
- return CephFSFSal.from_fsal_block(fsal_block)
- if fsal_block['name'] == 'RGW':
- return RGWFSal.from_fsal_block(fsal_block)
- return None
-
- def to_fsal_block(self):
- raise NotImplementedError()
-
- @staticmethod
- def from_dict(fsal_dict):
- if fsal_dict['name'] == "CEPH":
- return CephFSFSal.from_dict(fsal_dict)
- if fsal_dict['name'] == 'RGW':
- return RGWFSal.from_dict(fsal_dict)
- return None
-
- def to_dict(self):
- raise NotImplementedError()
-
-
-class RGWFSal(FSal):
- def __init__(self, name, rgw_user_id, access_key, secret_key):
- super(RGWFSal, self).__init__(name)
- self.rgw_user_id = rgw_user_id
- self.access_key = access_key
- self.secret_key = secret_key
-
- @classmethod
- def validate_path(cls, path):
- return path == "/" or re.match(r'^[^/><|&()#?]+$', path)
-
- def validate(self):
- if not self.rgw_user_id:
- raise NFSException('RGW user must be specified')
-
- if not RgwClient.admin_instance().user_exists(self.rgw_user_id):
- raise NFSException("RGW user '{}' does not exist"
- .format(self.rgw_user_id))
-
- def fill_keys(self):
- keys = RgwClient.admin_instance().get_user_keys(self.rgw_user_id)
- self.access_key = keys['access_key']
- self.secret_key = keys['secret_key']
-
- def create_path(self, path):
- if path == '/': # nothing to do
- return
- rgw = RgwClient.instance(self.rgw_user_id)
- try:
- exists = rgw.bucket_exists(path, self.rgw_user_id)
- logger.debug('Checking existence of RGW bucket "%s" for user "%s": %s',
- path, self.rgw_user_id, exists)
- except RequestException as exp:
- if exp.status_code == 403:
- raise NFSException('Cannot create bucket "{}" as it already '
- 'exists, and belongs to other user.'
- .format(path))
- raise exp
- if not exists:
- logger.info('Creating new RGW bucket "%s" for user "%s"', path,
- self.rgw_user_id)
- rgw.create_bucket(path)
-
- @classmethod
- def from_fsal_block(cls, fsal_block):
- return cls(fsal_block['name'],
- fsal_block['user_id'],
- fsal_block['access_key_id'],
- fsal_block['secret_access_key'])
-
- def to_fsal_block(self):
- return {
- 'block_name': 'FSAL',
- 'name': self.name,
- 'user_id': self.rgw_user_id,
- 'access_key_id': self.access_key,
- 'secret_access_key': self.secret_key
- }
-
- @classmethod
- def from_dict(cls, fsal_dict):
- return cls(fsal_dict['name'], fsal_dict['rgw_user_id'], None, None)
-
- def to_dict(self):
- return {
- 'name': self.name,
- 'rgw_user_id': self.rgw_user_id
- }
-
-
-class CephFSFSal(FSal):
- def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
- cephx_key=None):
- super(CephFSFSal, self).__init__(name)
- self.fs_name = fs_name
- self.user_id = user_id
- self.sec_label_xattr = sec_label_xattr
- self.cephx_key = cephx_key
-
- @classmethod
- def validate_path(cls, path):
- return re.match(r'^/[^><|&()?]*$', path)
-
- def validate(self):
- if self.user_id and self.user_id not in CephX.list_clients():
- raise NFSException("cephx user '{}' does not exist"
- .format(self.user_id))
-
- def fill_keys(self):
- if self.user_id:
- self.cephx_key = CephX.get_client_key(self.user_id)
-
- def create_path(self, path):
- cfs = CephFS(self.fs_name)
- if path == os.sep:
- return
- cfs.mk_dirs(path)
-
- @classmethod
- def from_fsal_block(cls, fsal_block):
- return cls(fsal_block['name'],
- fsal_block.get('user_id', None),
- fsal_block.get('filesystem', None),
- fsal_block.get('sec_label_xattr', None),
- fsal_block.get('secret_access_key', None))
-
- def to_fsal_block(self):
- result = {
- 'block_name': 'FSAL',
- 'name': self.name,
- }
- if self.user_id:
- result['user_id'] = self.user_id
- if self.fs_name:
- result['filesystem'] = self.fs_name
- if self.sec_label_xattr:
- result['sec_label_xattr'] = self.sec_label_xattr
- if self.cephx_key:
- result['secret_access_key'] = self.cephx_key
- return result
-
- @classmethod
- def from_dict(cls, fsal_dict):
- return cls(fsal_dict['name'], fsal_dict['user_id'],
- fsal_dict['fs_name'], fsal_dict['sec_label_xattr'], None)
-
- def to_dict(self):
- return {
- 'name': self.name,
- 'user_id': self.user_id,
- 'fs_name': self.fs_name,
- 'sec_label_xattr': self.sec_label_xattr
- }
-
-
-class Client(object):
- def __init__(self, addresses, access_type=None, squash=None):
- self.addresses = addresses
- self.access_type = access_type
- self.squash = GaneshaConf.format_squash(squash)
-
- @classmethod
- def from_client_block(cls, client_block):
- addresses = client_block['clients']
- if not isinstance(addresses, list):
- addresses = [addresses]
- return cls(addresses,
- client_block.get('access_type', None),
- client_block.get('squash', None))
-
- def to_client_block(self):
- result = {
- 'block_name': 'CLIENT',
- 'clients': self.addresses,
- }
- if self.access_type:
- result['access_type'] = self.access_type
- if self.squash:
- result['squash'] = self.squash
- return result
-
- @classmethod
- def from_dict(cls, client_dict):
- return cls(client_dict['addresses'], client_dict['access_type'],
- client_dict['squash'])
-
- def to_dict(self):
- return {
- 'addresses': self.addresses,
- 'access_type': self.access_type,
- 'squash': self.squash
- }
-
-
-class Export(object):
- # pylint: disable=R0902
- def __init__(self, export_id, path, fsal, cluster_id, daemons, pseudo=None,
- tag=None, access_type=None, squash=None,
- attr_expiration_time=None, security_label=False,
- protocols=None, transports=None, clients=None):
- self.export_id = export_id
- self.path = GaneshaConf.format_path(path)
- self.fsal = fsal
- self.cluster_id = cluster_id
- self.daemons = set(daemons)
- self.pseudo = GaneshaConf.format_path(pseudo)
- self.tag = tag
- self.access_type = access_type
- self.squash = GaneshaConf.format_squash(squash)
- if attr_expiration_time is None:
- self.attr_expiration_time = 0
- else:
- self.attr_expiration_time = attr_expiration_time
- self.security_label = security_label
- self.protocols = {GaneshaConf.format_protocol(p) for p in protocols}
- self.transports = set(transports)
- self.clients = clients
-
- def validate(self):
- # pylint: disable=R0912
- if not self.fsal.validate_path(self.path):
- raise NFSException("Export path ({}) is invalid.".format(self.path))
-
- if not self.protocols:
- raise NFSException(
- "No NFS protocol version specified for the export.")
-
- if not self.transports:
- raise NFSException(
- "No network transport type specified for the export.")
-
- for t in self.transports:
- match = re.match(r'^TCP$|^UDP$', t)
- if not match:
- raise NFSException(
- "'{}' is an invalid network transport type identifier"
- .format(t))
-
- self.fsal.validate()
-
- if 4 in self.protocols:
- if not self.pseudo:
- raise NFSException(
- "Pseudo path is required when NFSv4 protocol is used")
- match = re.match(r'^/[^><|&()]*$', self.pseudo)
- if not match:
- raise NFSException(
- "Export pseudo path ({}) is invalid".format(self.pseudo))
-
- if self.tag:
- match = re.match(r'^[^/><|:&()]+$', self.tag)
- if not match:
- raise NFSException(
- "Export tag ({}) is invalid".format(self.tag))
-
- if self.fsal.name == 'RGW' and 4 not in self.protocols and not self.tag:
- raise NFSException(
- "Tag is mandatory for RGW export when using only NFSv3")
-
- @classmethod
- def from_export_block(cls, export_block, cluster_id, defaults):
- logger.debug("parsing export block: %s", export_block)
-
- fsal_block = [b for b in export_block['_blocks_']
- if b['block_name'] == "FSAL"]
-
- protocols = export_block.get('protocols', defaults['protocols'])
- if not isinstance(protocols, list):
- protocols = [protocols]
-
- transports = export_block.get('transports', defaults['transports'])
- if not isinstance(transports, list):
- transports = [transports]
-
- client_blocks = [b for b in export_block['_blocks_']
- if b['block_name'] == "CLIENT"]
-
- return cls(export_block['export_id'],
- export_block['path'],
- FSal.from_fsal_block(fsal_block[0]),
- cluster_id,
- [],
- export_block.get('pseudo', None),
- export_block.get('tag', None),
- export_block.get('access_type', defaults['access_type']),
- export_block.get('squash', defaults['squash']),
- export_block.get('attr_expiration_time', None),
- export_block.get('security_label', False),
- protocols,
- transports,
- [Client.from_client_block(client)
- for client in client_blocks])
-
- def to_export_block(self, defaults):
- # pylint: disable=too-many-branches
- result = {
- 'block_name': 'EXPORT',
- 'export_id': self.export_id,
- 'path': self.path
- }
- if self.pseudo:
- result['pseudo'] = self.pseudo
- if self.tag:
- result['tag'] = self.tag
- if 'access_type' not in defaults \
- or self.access_type != defaults['access_type']:
- result['access_type'] = self.access_type
- if 'squash' not in defaults or self.squash != defaults['squash']:
- result['squash'] = self.squash
- if self.fsal.name == 'CEPH':
- result['attr_expiration_time'] = self.attr_expiration_time
- result['security_label'] = self.security_label
- if 'protocols' not in defaults:
- result['protocols'] = list(self.protocols)
- else:
- def_proto = defaults['protocols']
- if not isinstance(def_proto, list):
- def_proto = set([def_proto])
- if self.protocols != def_proto:
- result['protocols'] = list(self.protocols)
- if 'transports' not in defaults:
- result['transports'] = list(self.transports)
- else:
- def_transp = defaults['transports']
- if not isinstance(def_transp, list):
- def_transp = set([def_transp])
- if self.transports != def_transp:
- result['transports'] = list(self.transports)
-
- result['_blocks_'] = [self.fsal.to_fsal_block()]
- result['_blocks_'].extend([client.to_client_block()
- for client in self.clients])
- return result
-
- @classmethod
- def from_dict(cls, export_id, ex_dict, old_export=None):
- return cls(export_id,
- ex_dict['path'],
- FSal.from_dict(ex_dict['fsal']),
- ex_dict['cluster_id'],
- ex_dict['daemons'],
- ex_dict['pseudo'],
- ex_dict['tag'],
- ex_dict['access_type'],
- ex_dict['squash'],
- old_export.attr_expiration_time if old_export else None,
- ex_dict['security_label'],
- ex_dict['protocols'],
- ex_dict['transports'],
- [Client.from_dict(client) for client in ex_dict['clients']])
-
- def to_dict(self):
- return {
- 'export_id': self.export_id,
- 'path': self.path,
- 'fsal': self.fsal.to_dict(),
- 'cluster_id': self.cluster_id,
- 'daemons': sorted(list(self.daemons)),
- 'pseudo': self.pseudo,
- 'tag': self.tag,
- 'access_type': self.access_type,
- 'squash': self.squash,
- 'security_label': self.security_label,
- 'protocols': sorted(list(self.protocols)),
- 'transports': sorted(list(self.transports)),
- 'clients': [client.to_dict() for client in self.clients]
- }
-
-
-class ClusterType(object):
-
- # Ganesha clusters deployed by the Orchestrator.
- ORCHESTRATOR = 'orchestrator'
-
- # Ganesha clusters deployed manually by the user. Specified by using the
- # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
- USER = 'user'
-
-
-class GaneshaConf(object):
- # pylint: disable=R0902
-
- def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
- self.cluster_id = cluster_id
- self.rados_pool = rados_pool
- self.rados_namespace = rados_namespace
- self.daemon_confs = daemon_confs if daemon_confs is not None else []
- self.export_conf_blocks = [] # type: ignore
- self.daemons_conf_blocks = {} # type: ignore
- self._defaults = {}
- self.exports = {}
-
- self._read_raw_config()
-
- # load defaults
- def_block = [b for b in self.export_conf_blocks
- if b['block_name'] == "EXPORT_DEFAULTS"]
- self.export_defaults = def_block[0] if def_block else {}
- self._defaults = self.ganesha_defaults(self.export_defaults)
-
- for export_block in [block for block in self.export_conf_blocks
- if block['block_name'] == "EXPORT"]:
- export = Export.from_export_block(export_block, cluster_id,
- self._defaults)
- self.exports[export.export_id] = export
-
- # link daemons to exports
- self._link_daemons_to_exports()
-
- def _link_daemons_to_exports(self):
- raise NotImplementedError()
-
- @classmethod
- def instance(cls, cluster_id):
- cluster = Ganesha.get_cluster(cluster_id)
- if cluster['type'] == ClusterType.ORCHESTRATOR:
- return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
- [cluster['daemon_conf']])
- if cluster['type'] == ClusterType.USER:
- return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
- raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
- cluster['type'], cluster_id))
-
- def _read_raw_config(self):
-
- def _read_rados_obj(_obj):
- size, _ = _obj.stat()
- return _obj.read(size).decode("utf-8")
-
- with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- if self.rados_namespace:
- ioctx.set_namespace(self.rados_namespace)
- objs = ioctx.list_objects()
- for obj in objs:
- if obj.key.startswith("export-"):
- raw_config = _read_rados_obj(obj)
- logger.debug("read export configuration from rados "
- "object %s/%s/%s:\n%s", self.rados_pool,
- self.rados_namespace, obj.key, raw_config)
- self.export_conf_blocks.extend(
- GaneshaConfParser(raw_config).parse())
- elif not self.daemon_confs and obj.key.startswith("conf-"):
- # Read all `conf-xxx` for daemon configs.
- raw_config = _read_rados_obj(obj)
- logger.debug("read daemon configuration from rados "
- "object %s/%s/%s:\n%s", self.rados_pool,
- self.rados_namespace, obj.key, raw_config)
- idx = obj.key.find('-')
- self.daemons_conf_blocks[obj.key[idx+1:]] = \
- GaneshaConfParser(raw_config).parse()
-
- if self.daemon_confs:
- # When daemon configs are provided.
- for conf in self.daemon_confs:
- size, _ = ioctx.stat(conf)
- raw_config = ioctx.read(conf, size).decode("utf-8")
- logger.debug("read daemon configuration from rados "
- "object %s/%s/%s:\n%s", self.rados_pool,
- self.rados_namespace, conf, raw_config)
- self.daemons_conf_blocks[conf] = \
- GaneshaConfParser(raw_config).parse()
-
- def _write_raw_config(self, conf_block, obj):
- raw_config = GaneshaConfParser.write_conf(conf_block)
- with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- if self.rados_namespace:
- ioctx.set_namespace(self.rados_namespace)
- ioctx.write_full(obj, raw_config.encode('utf-8'))
- logger.debug(
- "write configuration into rados object %s/%s/%s:\n%s",
- self.rados_pool, self.rados_namespace, obj, raw_config)
-
- @classmethod
- def ganesha_defaults(cls, export_defaults):
- """
- According to
- https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
- """
- return {
- 'access_type': export_defaults.get('access_type', 'NONE'),
- 'protocols': export_defaults.get('protocols', [3, 4]),
- 'transports': export_defaults.get('transports', ['TCP', 'UDP']),
- 'squash': export_defaults.get('squash', 'root_squash')
- }
-
- @classmethod
- def format_squash(cls, squash):
- if squash is None:
- return None
- if squash.lower() in ["no_root_squash", "noidsquash", "none"]:
- return "no_root_squash"
- if squash.lower() in ["rootid", "root_id_squash", "rootidsquash"]:
- return "root_id_squash"
- if squash.lower() in ["root", "root_squash", "rootsquash"]:
- return "root_squash"
- if squash.lower() in ["all", "all_squash", "allsquash",
- "all_anonymous", "allanonymous"]:
- return "all_squash"
- logger.error("could not parse squash value: %s", squash)
- raise NFSException("'{}' is an invalid squash option".format(squash))
-
- @classmethod
- def format_protocol(cls, protocol):
- if str(protocol) in ["NFSV3", "3", "V3", "NFS3"]:
- return 3
- if str(protocol) in ["NFSV4", "4", "V4", "NFS4"]:
- return 4
- logger.error("could not parse protocol value: %s", protocol)
- raise NFSException("'{}' is an invalid NFS protocol version identifier"
- .format(protocol))
-
- @classmethod
- def format_path(cls, path):
- if path is not None:
- path = path.strip()
- if len(path) > 1 and path[-1] == '/':
- path = path[:-1]
- return path
-
- def validate(self, export: Export):
- export.validate()
-
- if 4 in export.protocols: # NFSv4 protocol
- len_prefix = 1
- parent_export = None
- for ex in self.list_exports():
- if export.tag and ex.tag == export.tag:
- raise NFSException(
- "Another export exists with the same tag: {}"
- .format(export.tag))
-
- if export.pseudo and ex.pseudo == export.pseudo:
- raise NFSException(
- "Another export exists with the same pseudo path: {}"
- .format(export.pseudo))
-
- if not ex.pseudo:
- continue
-
- if export.pseudo[:export.pseudo.rfind('/')+1].startswith(ex.pseudo):
- if export.pseudo[len(ex.pseudo)] == '/':
- if len(ex.pseudo) > len_prefix:
- len_prefix = len(ex.pseudo)
- parent_export = ex
-
- if len_prefix > 1:
- # validate pseudo path
- idx = len(parent_export.pseudo) # type: ignore
- idx = idx + 1 if idx > 1 else idx
- real_path = "{}/{}".format(
- parent_export.path # type: ignore
- if len(parent_export.path) > 1 else "", # type: ignore
- export.pseudo[idx:])
- if export.fsal.name == 'CEPH':
- cfs = CephFS()
- if export.path != real_path and not cfs.dir_exists(real_path):
- raise NFSException(
- "Pseudo path ({}) invalid, path {} does not exist."
- .format(export.pseudo, real_path))
-
- def _gen_export_id(self):
- exports = sorted(self.exports)
- nid = 1
- for e_id in exports:
- if e_id == nid:
- nid += 1
- else:
- break
- return nid
-
- def _persist_daemon_configuration(self):
- raise NotImplementedError()
-
- def _save_export(self, export):
- self.validate(export)
- export.fsal.create_path(export.path)
- export.fsal.fill_keys()
- self.exports[export.export_id] = export
- conf_block = export.to_export_block(self.export_defaults)
- self._write_raw_config(conf_block, "export-{}".format(export.export_id))
- self._persist_daemon_configuration()
-
- def _delete_export(self, export_id):
- self._persist_daemon_configuration()
- with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- if self.rados_namespace:
- ioctx.set_namespace(self.rados_namespace)
- ioctx.remove_object("export-{}".format(export_id))
-
- def list_exports(self):
- return [ex for _, ex in self.exports.items()]
-
- def create_export(self, ex_dict):
- ex_id = self._gen_export_id()
- export = Export.from_dict(ex_id, ex_dict)
- self._save_export(export)
- return ex_id
-
- def has_export(self, export_id):
- return export_id in self.exports
-
- def update_export(self, ex_dict):
- if ex_dict['export_id'] not in self.exports:
- return None
- old_export = self.exports[ex_dict['export_id']]
- del self.exports[ex_dict['export_id']]
- export = Export.from_dict(ex_dict['export_id'], ex_dict, old_export)
- self._save_export(export)
- self.exports[export.export_id] = export
- return old_export
-
- def remove_export(self, export_id):
- if export_id not in self.exports:
- return None
- export = self.exports[export_id]
- del self.exports[export_id]
- self._delete_export(export_id)
- return export
-
- def get_export(self, export_id):
- if export_id in self.exports:
- return self.exports[export_id]
- return None
-
- def list_daemons(self) -> List[Dict[str, Any]]:
- raise NotImplementedError()
-
- def list_daemon_confs(self):
- return self.daemons_conf_blocks.keys()
-
- def reload_daemons(self, daemons):
- with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- if self.rados_namespace:
- ioctx.set_namespace(self.rados_namespace)
- for daemon_id in daemons:
- ioctx.notify("conf-{}".format(daemon_id))
-
-
-class GaneshaConfOrchestrator(GaneshaConf):
- @classmethod
- def _get_orch_nfs_instances(cls,
- service_name: Optional[str] = None) -> List[DaemonDescription]:
- try:
- return OrchClient.instance().services.\
- list_daemons(service_name=service_name, daemon_type="nfs")
- except (RuntimeError, OrchestratorError, ImportError):
- return []
-
- def _link_daemons_to_exports(self):
- instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
- daemon_ids = {instance.daemon_id for instance in instances}
- for _, daemon_blocks in self.daemons_conf_blocks.items():
- for block in daemon_blocks:
- if block['block_name'] == "%url":
- rados_url = block['value']
- _, _, obj = Ganesha.parse_rados_url(rados_url)
- if obj.startswith("export-"):
- export_id = int(obj[obj.find('-')+1:])
- self.exports[export_id].daemons.update(daemon_ids)
-
- def validate(self, export: Export):
- daemons_list = {d['daemon_id'] for d in self.list_daemons()}
- if export.daemons and set(export.daemons) != daemons_list:
- raise NFSException('Export should be linked to all daemons.')
- super().validate(export)
-
- def _persist_daemon_configuration(self):
- daemon_map = {} # type: ignore
- for daemon_id in self.list_daemon_confs():
- daemon_map[daemon_id] = []
-
- for daemon_id in self.list_daemon_confs():
- for _, ex in self.exports.items():
- if ex.daemons:
- daemon_map[daemon_id].append({
- 'block_name': "%url",
- 'value': Ganesha.make_rados_url(
- self.rados_pool, self.rados_namespace,
- "export-{}".format(ex.export_id))
- })
- for daemon_id, conf_blocks in daemon_map.items():
- self._write_raw_config(conf_blocks, daemon_id)
-
- def list_daemons(self) -> List[Dict[str, Any]]:
- instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
- return [{
- 'cluster_id': self.cluster_id,
- 'daemon_id': instance.daemon_id,
- 'cluster_type': ClusterType.ORCHESTRATOR,
- 'status': instance.status,
- 'status_desc': instance.status_desc
- } for instance in instances]
-
- def reload_daemons(self, daemons):
- with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
- if self.rados_namespace:
- ioctx.set_namespace(self.rados_namespace)
- for daemon_id in self.list_daemon_confs():
- ioctx.notify(daemon_id)
-
-
-class GaneshaConfUser(GaneshaConf):
-
- def _link_daemons_to_exports(self):
- for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
- for block in daemon_blocks:
- if block['block_name'] == "%url":
- rados_url = block['value']
- _, _, obj = Ganesha.parse_rados_url(rados_url)
- if obj.startswith("export-"):
- export_id = int(obj[obj.find('-')+1:])
- self.exports[export_id].daemons.add(daemon_id)
-
- def validate(self, export: Export):
- daemons_list = [d['daemon_id'] for d in self.list_daemons()]
- for daemon_id in export.daemons:
- if daemon_id not in daemons_list:
- raise NFSException("Daemon '{}' does not exist".format(daemon_id))
- super().validate(export)
-
- def _persist_daemon_configuration(self):
- daemon_map = {} # type: ignore
- for daemon_id in self.list_daemon_confs():
- daemon_map[daemon_id] = []
-
- for _, ex in self.exports.items():
- for daemon in ex.daemons:
- daemon_map[daemon].append({
- 'block_name': "%url",
- 'value': Ganesha.make_rados_url(
- self.rados_pool, self.rados_namespace,
- "export-{}".format(ex.export_id))
- })
- for daemon_id, conf_blocks in daemon_map.items():
- self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
-
- def list_daemons(self) -> List[Dict[str, Any]]:
- return [{
- 'cluster_id': self.cluster_id,
- 'cluster_type': ClusterType.USER,
- 'daemon_id': daemon_id,
- 'status': 1,
- 'status_desc': 'running'
- } for daemon_id in self.list_daemon_confs()]
# -*- coding: utf-8 -*-
# pylint: disable=too-many-lines
-import unittest
-from unittest.mock import MagicMock, Mock, patch
+from unittest.mock import patch
from urllib.parse import urlencode
-from ceph.deployment.service_spec import NFSServiceSpec
-from orchestrator import DaemonDescription, ServiceDescription
-
-from .. import mgr
from ..controllers.nfsganesha import NFSGaneshaUi
-from ..services import ganesha
-from ..services.ganesha import ClusterType, Export, GaneshaConf, GaneshaConfParser, NFSException
-from ..settings import Settings
from . import ControllerTestCase # pylint: disable=no-name-in-module
-from . import KVStoreMockMixin # pylint: disable=no-name-in-module
-
-
-class GaneshaConfTest(unittest.TestCase, KVStoreMockMixin):
- daemon_raw_config = """
-NFS_CORE_PARAM {
- Enable_NLM = false;
- Enable_RQUOTA = false;
- Protocols = 4;
- NFS_Port = 14000;
- }
-
- MDCACHE {
- Dir_Chunk = 0;
- }
-
- NFSv4 {
- RecoveryBackend = rados_cluster;
- Minor_Versions = 1, 2;
- }
-
- RADOS_KV {
- pool = nfs-ganesha;
- namespace = vstart;
- UserId = vstart;
- nodeid = a;
- }
-
- RADOS_URLS {
- Userid = vstart;
- watch_url = 'rados://nfs-ganesha/vstart/conf-nfs.vstart';
- }
-
- %url rados://nfs-ganesha/vstart/conf-nfs.vstart
-"""
-
- export_1 = """
-EXPORT {
- Export_ID=1;
- Protocols = 4;
- Path = /;
- Pseudo = /cephfs_a/;
- Access_Type = RW;
- Protocols = 4;
- Attr_Expiration_Time = 0;
- # Delegations = R;
- # Squash = root;
-
- FSAL {
- Name = CEPH;
- Filesystem = "a";
- User_Id = "ganesha";
- # Secret_Access_Key = "YOUR SECRET KEY HERE";
- }
-
- CLIENT
- {
- Clients = 192.168.0.10, 192.168.1.0/8;
- Squash = None;
- }
-
- CLIENT
- {
- Clients = 192.168.0.0/16;
- Squash = All;
- Access_Type = RO;
- }
-}
-"""
-
- export_2 = """
-EXPORT
-{
- Export_ID=2;
-
- Path = "/";
-
- Pseudo = "/rgw";
-
- Access_Type = RW;
-
- squash = AllAnonymous;
-
- Protocols = 4, 3;
-
- Transports = TCP, UDP;
-
- FSAL {
- Name = RGW;
- User_Id = "testuser";
- Access_Key_Id ="access_key";
- Secret_Access_Key = "secret_key";
- }
-}
-"""
-
- conf_nodea = '''
-%url "rados://nfs-ganesha/bar/export-2"
-
-%url "rados://nfs-ganesha/bar/export-1"'''
-
- conf_nodeb = '%url "rados://nfs-ganesha/bar/export-1"'
-
- conf_nfs_foo = '''
-%url "rados://nfs-ganesha/foo/export-1"
-
-%url "rados://nfs-ganesha/foo/export-2"'''
-
- class RObject(object):
- def __init__(self, key, raw):
- self.key = key
- self.raw = raw
-
- def read(self, _):
- return self.raw.encode('utf-8')
-
- def stat(self):
- return len(self.raw), None
-
- def _ioctx_write_full_mock(self, key, content):
- if key not in self.temp_store[self.temp_store_namespace]:
- self.temp_store[self.temp_store_namespace][key] = \
- GaneshaConfTest.RObject(key, content.decode('utf-8'))
- else:
- self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8')
-
- def _ioctx_remove_mock(self, key):
- del self.temp_store[self.temp_store_namespace][key]
-
- def _ioctx_list_objects_mock(self):
- return [obj for _, obj in self.temp_store[self.temp_store_namespace].items()]
-
- def _ioctl_stat_mock(self, key):
- return self.temp_store[self.temp_store_namespace][key].stat()
-
- def _ioctl_read_mock(self, key, size):
- return self.temp_store[self.temp_store_namespace][key].read(size)
-
- def _ioctx_set_namespace_mock(self, namespace):
- self.temp_store_namespace = namespace
-
- @staticmethod
- def _set_user_defined_clusters_location(clusters_pool_namespace='nfs-ganesha/bar'):
- Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = clusters_pool_namespace
-
- def setUp(self):
- self.mock_kv_store()
-
- self.clusters = {
- 'foo': {
- 'pool': 'nfs-ganesha',
- 'namespace': 'foo',
- 'type': ClusterType.ORCHESTRATOR,
- 'daemon_conf': 'conf-nfs.foo',
- 'daemons': ['foo.host_a', 'foo.host_b'],
- 'exports': {
- 1: ['foo.host_a', 'foo.host_b'],
- 2: ['foo.host_a', 'foo.host_b'],
- 3: ['foo.host_a', 'foo.host_b'] # for new-added export
- }
- }
- }
-
- # Unset user-defined location.
- self._set_user_defined_clusters_location('')
-
- self.temp_store_namespace = None
- self._reset_temp_store()
-
- self.io_mock = MagicMock()
- self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock
- self.io_mock.read = self._ioctl_read_mock
- self.io_mock.stat = self._ioctl_stat_mock
- self.io_mock.list_objects.side_effect = self._ioctx_list_objects_mock
- self.io_mock.write_full.side_effect = self._ioctx_write_full_mock
- self.io_mock.remove_object.side_effect = self._ioctx_remove_mock
-
- ioctx_mock = MagicMock()
- ioctx_mock.__enter__ = Mock(return_value=(self.io_mock))
- ioctx_mock.__exit__ = Mock(return_value=None)
-
- mgr.rados = MagicMock()
- mgr.rados.open_ioctx.return_value = ioctx_mock
-
- self._mock_orchestrator(True)
-
- ganesha.CephX = MagicMock()
- ganesha.CephX.list_clients.return_value = ['ganesha']
- ganesha.CephX.get_client_key.return_value = 'ganesha'
-
- ganesha.CephFS = MagicMock()
-
- def _reset_temp_store(self):
- self.temp_store_namespace = None
- self.temp_store = {
- 'bar': {
- 'export-1': GaneshaConfTest.RObject("export-1", self.export_1),
- 'export-2': GaneshaConfTest.RObject("export-2", self.export_2),
- 'conf-nodea': GaneshaConfTest.RObject("conf-nodea", self.conf_nodea),
- 'conf-nodeb': GaneshaConfTest.RObject("conf-nodeb", self.conf_nodeb),
- },
- 'foo': {
- 'export-1': GaneshaConfTest.RObject("export-1", self.export_1),
- 'export-2': GaneshaConfTest.RObject("export-2", self.export_2),
- 'conf-nfs.foo': GaneshaConfTest.RObject("conf-nfs.foo", self.conf_nfs_foo)
- }
- }
-
- def _mock_orchestrator(self, enable):
- # mock nfs services
- orch_nfs_services = [
- ServiceDescription(spec=NFSServiceSpec(service_id='foo'))
- ] if enable else []
- # pylint: disable=protected-access
- ganesha.Ganesha._get_orch_nfs_services = Mock(return_value=orch_nfs_services)
-
- # mock nfs daemons
- def _get_nfs_instances(service_name=None):
- if not enable:
- return []
- instances = {
- 'nfs.foo': [
- DaemonDescription(daemon_id='foo.host_a', status=1),
- DaemonDescription(daemon_id='foo.host_b', status=1)
- ],
- 'nfs.bar': [
- DaemonDescription(daemon_id='bar.host_c', status=1)
- ]
- }
- if service_name is not None:
- return instances[service_name]
- result = []
- for _, daemons in instances.items():
- result.extend(daemons)
- return result
- ganesha.GaneshaConfOrchestrator._get_orch_nfs_instances = Mock(
- side_effect=_get_nfs_instances)
-
- def test_parse_daemon_raw_config(self):
- expected_daemon_config = [
- {
- "block_name": "NFS_CORE_PARAM",
- "enable_nlm": False,
- "enable_rquota": False,
- "protocols": 4,
- "nfs_port": 14000
- },
- {
- "block_name": "MDCACHE",
- "dir_chunk": 0
- },
- {
- "block_name": "NFSV4",
- "recoverybackend": "rados_cluster",
- "minor_versions": [1, 2]
- },
- {
- "block_name": "RADOS_KV",
- "pool": "nfs-ganesha",
- "namespace": "vstart",
- "userid": "vstart",
- "nodeid": "a"
- },
- {
- "block_name": "RADOS_URLS",
- "userid": "vstart",
- "watch_url": "'rados://nfs-ganesha/vstart/conf-nfs.vstart'"
- },
- {
- "block_name": "%url",
- "value": "rados://nfs-ganesha/vstart/conf-nfs.vstart"
- }
- ]
- daemon_config = GaneshaConfParser(self.daemon_raw_config).parse()
- self.assertEqual(daemon_config, expected_daemon_config)
-
- def test_export_parser_1(self):
- blocks = GaneshaConfParser(self.export_1).parse()
- self.assertIsInstance(blocks, list)
- self.assertEqual(len(blocks), 1)
- export = Export.from_export_block(blocks[0], '_default_',
- GaneshaConf.ganesha_defaults({}))
-
- self.assertEqual(export.export_id, 1)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/cephfs_a")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "root_squash")
- self.assertEqual(export.protocols, {4})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "CEPH")
- self.assertEqual(export.fsal.user_id, "ganesha")
- self.assertEqual(export.fsal.fs_name, "a")
- self.assertEqual(export.fsal.sec_label_xattr, None)
- self.assertEqual(len(export.clients), 2)
- self.assertEqual(export.clients[0].addresses,
- ["192.168.0.10", "192.168.1.0/8"])
- self.assertEqual(export.clients[0].squash, "no_root_squash")
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
- self.assertEqual(export.clients[1].squash, "all_squash")
- self.assertEqual(export.clients[1].access_type, "RO")
- self.assertEqual(export.cluster_id, '_default_')
- self.assertEqual(export.attr_expiration_time, 0)
- self.assertEqual(export.security_label, False)
-
- def test_export_parser_2(self):
- blocks = GaneshaConfParser(self.export_2).parse()
- self.assertIsInstance(blocks, list)
- self.assertEqual(len(blocks), 1)
- export = Export.from_export_block(blocks[0], '_default_',
- GaneshaConf.ganesha_defaults({}))
-
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/rgw")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key")
- self.assertEqual(export.fsal.secret_key, "secret_key")
- self.assertEqual(len(export.clients), 0)
- self.assertEqual(export.cluster_id, '_default_')
-
- def test_daemon_conf_parser_a(self):
- blocks = GaneshaConfParser(self.conf_nodea).parse()
- self.assertIsInstance(blocks, list)
- self.assertEqual(len(blocks), 2)
- self.assertEqual(blocks[0]['block_name'], "%url")
- self.assertEqual(blocks[0]['value'], "rados://nfs-ganesha/bar/export-2")
- self.assertEqual(blocks[1]['block_name'], "%url")
- self.assertEqual(blocks[1]['value'], "rados://nfs-ganesha/bar/export-1")
-
- def test_daemon_conf_parser_b(self):
- blocks = GaneshaConfParser(self.conf_nodeb).parse()
- self.assertIsInstance(blocks, list)
- self.assertEqual(len(blocks), 1)
- self.assertEqual(blocks[0]['block_name'], "%url")
- self.assertEqual(blocks[0]['value'], "rados://nfs-ganesha/bar/export-1")
-
- def test_ganesha_conf(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_ganesha_conf(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_ganesha_conf(self, cluster, expected_exports):
- ganesha_conf = GaneshaConf.instance(cluster)
- exports = ganesha_conf.exports
-
- self.assertEqual(len(exports.items()), 2)
- self.assertIn(1, exports)
- self.assertIn(2, exports)
-
- # export_id = 1 asserts
- export = exports[1]
- self.assertEqual(export.export_id, 1)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/cephfs_a")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "root_squash")
- self.assertEqual(export.protocols, {4})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "CEPH")
- self.assertEqual(export.fsal.user_id, "ganesha")
- self.assertEqual(export.fsal.fs_name, "a")
- self.assertEqual(export.fsal.sec_label_xattr, None)
- self.assertEqual(len(export.clients), 2)
- self.assertEqual(export.clients[0].addresses,
- ["192.168.0.10", "192.168.1.0/8"])
- self.assertEqual(export.clients[0].squash, "no_root_squash")
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
- self.assertEqual(export.clients[1].squash, "all_squash")
- self.assertEqual(export.clients[1].access_type, "RO")
- self.assertEqual(export.attr_expiration_time, 0)
- self.assertEqual(export.security_label, False)
- self.assertSetEqual(export.daemons, set(expected_exports[1]))
-
- # export_id = 2 asserts
- export = exports[2]
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/rgw")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key")
- self.assertEqual(export.fsal.secret_key, "secret_key")
- self.assertEqual(len(export.clients), 0)
- self.assertSetEqual(export.daemons, set(expected_exports[2]))
-
- def test_config_dict(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_config_dict(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_config_dict(self, cluster, expected_exports):
- conf = GaneshaConf.instance(cluster)
- export = conf.exports[1]
- ex_dict = export.to_dict()
- self.assertDictEqual(ex_dict, {
- 'daemons': expected_exports[1],
- 'export_id': 1,
- 'path': '/',
- 'pseudo': '/cephfs_a',
- 'cluster_id': cluster,
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'root_squash',
- 'security_label': False,
- 'protocols': [4],
- 'transports': ['TCP', 'UDP'],
- 'clients': [{
- 'addresses': ["192.168.0.10", "192.168.1.0/8"],
- 'access_type': None,
- 'squash': 'no_root_squash'
- }, {
- 'addresses': ["192.168.0.0/16"],
- 'access_type': 'RO',
- 'squash': 'all_squash'
- }],
- 'fsal': {
- 'name': 'CEPH',
- 'user_id': 'ganesha',
- 'fs_name': 'a',
- 'sec_label_xattr': None
- }
- })
-
- export = conf.exports[2]
- ex_dict = export.to_dict()
- self.assertDictEqual(ex_dict, {
- 'daemons': expected_exports[2],
- 'export_id': 2,
- 'path': '/',
- 'pseudo': '/rgw',
- 'cluster_id': cluster,
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': False,
- 'protocols': [3, 4],
- 'transports': ['TCP', 'UDP'],
- 'clients': [],
- 'fsal': {
- 'name': 'RGW',
- 'rgw_user_id': 'testuser'
- }
- })
-
- def test_config_from_dict(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_config_from_dict(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_config_from_dict(self, cluster_id, expected_exports):
- export = Export.from_dict(1, {
- 'daemons': expected_exports[1],
- 'export_id': 1,
- 'path': '/',
- 'cluster_id': cluster_id,
- 'pseudo': '/cephfs_a',
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'root_squash',
- 'security_label': True,
- 'protocols': [4],
- 'transports': ['TCP', 'UDP'],
- 'clients': [{
- 'addresses': ["192.168.0.10", "192.168.1.0/8"],
- 'access_type': None,
- 'squash': 'no_root_squash'
- }, {
- 'addresses': ["192.168.0.0/16"],
- 'access_type': 'RO',
- 'squash': 'all_squash'
- }],
- 'fsal': {
- 'name': 'CEPH',
- 'user_id': 'ganesha',
- 'fs_name': 'a',
- 'sec_label_xattr': 'security.selinux'
- }
- })
-
- self.assertEqual(export.export_id, 1)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/cephfs_a")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "root_squash")
- self.assertEqual(export.protocols, {4})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "CEPH")
- self.assertEqual(export.fsal.user_id, "ganesha")
- self.assertEqual(export.fsal.fs_name, "a")
- self.assertEqual(export.fsal.sec_label_xattr, 'security.selinux')
- self.assertEqual(len(export.clients), 2)
- self.assertEqual(export.clients[0].addresses,
- ["192.168.0.10", "192.168.1.0/8"])
- self.assertEqual(export.clients[0].squash, "no_root_squash")
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
- self.assertEqual(export.clients[1].squash, "all_squash")
- self.assertEqual(export.clients[1].access_type, "RO")
- self.assertEqual(export.daemons, set(expected_exports[1]))
- self.assertEqual(export.cluster_id, cluster_id)
- self.assertEqual(export.attr_expiration_time, 0)
- self.assertEqual(export.security_label, True)
-
- export = Export.from_dict(2, {
- 'daemons': expected_exports[2],
- 'export_id': 2,
- 'path': '/',
- 'pseudo': '/rgw',
- 'cluster_id': cluster_id,
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': False,
- 'protocols': [4, 3],
- 'transports': ['TCP', 'UDP'],
- 'clients': [],
- 'fsal': {
- 'name': 'RGW',
- 'rgw_user_id': 'testuser'
- }
- })
-
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/rgw")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertIsNone(export.fsal.access_key)
- self.assertIsNone(export.fsal.secret_key)
- self.assertEqual(len(export.clients), 0)
- self.assertEqual(export.daemons, set(expected_exports[2]))
- self.assertEqual(export.cluster_id, cluster_id)
-
- def test_gen_raw_config(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_gen_raw_config(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_gen_raw_config(self, cluster_id, expected_exports):
- conf = GaneshaConf.instance(cluster_id)
- # pylint: disable=W0212
- export = conf.exports[1]
- del conf.exports[1]
- conf._save_export(export)
- conf = GaneshaConf.instance(cluster_id)
- exports = conf.exports
- self.assertEqual(len(exports.items()), 2)
- self.assertIn(1, exports)
- self.assertIn(2, exports)
-
- # export_id = 1 asserts
- export = exports[1]
- self.assertEqual(export.export_id, 1)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/cephfs_a")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "root_squash")
- self.assertEqual(export.protocols, {4})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "CEPH")
- self.assertEqual(export.fsal.user_id, "ganesha")
- self.assertEqual(export.fsal.fs_name, "a")
- self.assertEqual(export.fsal.sec_label_xattr, None)
- self.assertEqual(len(export.clients), 2)
- self.assertEqual(export.clients[0].addresses,
- ["192.168.0.10", "192.168.1.0/8"])
- self.assertEqual(export.clients[0].squash, "no_root_squash")
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
- self.assertEqual(export.clients[1].squash, "all_squash")
- self.assertEqual(export.clients[1].access_type, "RO")
- self.assertEqual(export.daemons, set(expected_exports[1]))
- self.assertEqual(export.cluster_id, cluster_id)
- self.assertEqual(export.attr_expiration_time, 0)
- self.assertEqual(export.security_label, False)
-
- # export_id = 2 asserts
- export = exports[2]
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/rgw")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key")
- self.assertEqual(export.fsal.secret_key, "secret_key")
- self.assertEqual(len(export.clients), 0)
- self.assertEqual(export.daemons, set(expected_exports[2]))
- self.assertEqual(export.cluster_id, cluster_id)
-
- def test_update_export(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_update_export(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_update_export(self, cluster_id, expected_exports):
- ganesha.RgwClient = MagicMock()
- admin_inst_mock = MagicMock()
- admin_inst_mock.get_user_keys.return_value = {
- 'access_key': 'access_key',
- 'secret_key': 'secret_key'
- }
- ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
-
- conf = GaneshaConf.instance(cluster_id)
- conf.update_export({
- 'export_id': 2,
- 'daemons': expected_exports[2],
- 'path': 'bucket',
- 'pseudo': '/rgw/bucket',
- 'cluster_id': cluster_id,
- 'tag': 'bucket_tag',
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': False,
- 'protocols': [4, 3],
- 'transports': ['TCP', 'UDP'],
- 'clients': [{
- 'addresses': ["192.168.0.0/16"],
- 'access_type': None,
- 'squash': None
- }],
- 'fsal': {
- 'name': 'RGW',
- 'rgw_user_id': 'testuser'
- }
- })
-
- conf = GaneshaConf.instance(cluster_id)
- export = conf.get_export(2)
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "bucket")
- self.assertEqual(export.pseudo, "/rgw/bucket")
- self.assertEqual(export.tag, "bucket_tag")
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key")
- self.assertEqual(export.fsal.secret_key, "secret_key")
- self.assertEqual(len(export.clients), 1)
- self.assertEqual(export.clients[0].addresses, ["192.168.0.0/16"])
- self.assertIsNone(export.clients[0].squash)
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.daemons, set(expected_exports[2]))
- self.assertEqual(export.cluster_id, cluster_id)
-
- def test_remove_export(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_remove_export(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_remove_export(self, cluster_id, expected_exports):
- conf = GaneshaConf.instance(cluster_id)
- conf.remove_export(1)
- exports = conf.list_exports()
- self.assertEqual(len(exports), 1)
- self.assertEqual(2, exports[0].export_id)
- export = conf.get_export(2)
- self.assertEqual(export.export_id, 2)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/rgw")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key")
- self.assertEqual(export.fsal.secret_key, "secret_key")
- self.assertEqual(len(export.clients), 0)
- self.assertEqual(export.daemons, set(expected_exports[2]))
- self.assertEqual(export.cluster_id, cluster_id)
-
- def test_create_export_rgw(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_create_export_rgw(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_create_export_rgw(self, cluster_id, expected_exports):
- ganesha.RgwClient = MagicMock()
- admin_inst_mock = MagicMock()
- admin_inst_mock.get_user_keys.return_value = {
- 'access_key': 'access_key2',
- 'secret_key': 'secret_key2'
- }
- ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
-
- conf = GaneshaConf.instance(cluster_id)
- ex_id = conf.create_export({
- 'daemons': expected_exports[3],
- 'path': 'bucket',
- 'pseudo': '/rgw/bucket',
- 'tag': 'bucket_tag',
- 'cluster_id': cluster_id,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': False,
- 'protocols': [4, 3],
- 'transports': ['TCP', 'UDP'],
- 'clients': [{
- 'addresses': ["192.168.0.0/16"],
- 'access_type': None,
- 'squash': None
- }],
- 'fsal': {
- 'name': 'RGW',
- 'rgw_user_id': 'testuser'
- }
- })
-
- conf = GaneshaConf.instance(cluster_id)
- exports = conf.list_exports()
- self.assertEqual(len(exports), 3)
- export = conf.get_export(ex_id)
- self.assertEqual(export.export_id, ex_id)
- self.assertEqual(export.path, "bucket")
- self.assertEqual(export.pseudo, "/rgw/bucket")
- self.assertEqual(export.tag, "bucket_tag")
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4, 3})
- self.assertEqual(export.transports, {"TCP", "UDP"})
- self.assertEqual(export.fsal.name, "RGW")
- self.assertEqual(export.fsal.rgw_user_id, "testuser")
- self.assertEqual(export.fsal.access_key, "access_key2")
- self.assertEqual(export.fsal.secret_key, "secret_key2")
- self.assertEqual(len(export.clients), 1)
- self.assertEqual(export.clients[0].addresses, ["192.168.0.0/16"])
- self.assertIsNone(export.clients[0].squash)
- self.assertIsNone(export.clients[0].access_type)
- self.assertEqual(export.daemons, set(expected_exports[3]))
- self.assertEqual(export.cluster_id, cluster_id)
-
- def test_create_export_cephfs(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_create_export_cephfs(cluster_id, info['exports'])
- self._reset_temp_store()
-
- def _do_test_create_export_cephfs(self, cluster_id, expected_exports):
- ganesha.CephX = MagicMock()
- ganesha.CephX.list_clients.return_value = ["fs"]
- ganesha.CephX.get_client_key.return_value = "fs_key"
-
- ganesha.CephFS = MagicMock()
- ganesha.CephFS.dir_exists.return_value = True
-
- conf = GaneshaConf.instance(cluster_id)
- ex_id = conf.create_export({
- 'daemons': expected_exports[3],
- 'path': '/',
- 'pseudo': '/cephfs2',
- 'cluster_id': cluster_id,
- 'tag': None,
- 'access_type': 'RW',
- 'squash': 'all_squash',
- 'security_label': True,
- 'protocols': [4],
- 'transports': ['TCP'],
- 'clients': [],
- 'fsal': {
- 'name': 'CEPH',
- 'user_id': 'fs',
- 'fs_name': None,
- 'sec_label_xattr': 'security.selinux'
- }
- })
-
- conf = GaneshaConf.instance(cluster_id)
- exports = conf.list_exports()
- self.assertEqual(len(exports), 3)
- export = conf.get_export(ex_id)
- self.assertEqual(export.export_id, ex_id)
- self.assertEqual(export.path, "/")
- self.assertEqual(export.pseudo, "/cephfs2")
- self.assertIsNone(export.tag)
- self.assertEqual(export.access_type, "RW")
- self.assertEqual(export.squash, "all_squash")
- self.assertEqual(export.protocols, {4})
- self.assertEqual(export.transports, {"TCP"})
- self.assertEqual(export.fsal.name, "CEPH")
- self.assertEqual(export.fsal.user_id, "fs")
- self.assertEqual(export.fsal.cephx_key, "fs_key")
- self.assertEqual(export.fsal.sec_label_xattr, "security.selinux")
- self.assertIsNone(export.fsal.fs_name)
- self.assertEqual(len(export.clients), 0)
- self.assertEqual(export.daemons, set(expected_exports[3]))
- self.assertEqual(export.cluster_id, cluster_id)
- self.assertEqual(export.attr_expiration_time, 0)
- self.assertEqual(export.security_label, True)
-
- def test_reload_daemons(self):
- # Fail to import call in Python 3.8, see https://bugs.python.org/issue35753
- mock_call = unittest.mock.call
-
- # Orchestrator cluster: reload all daemon config objects.
- conf = GaneshaConf.instance('foo')
- calls = [mock_call(conf) for conf in conf.list_daemon_confs()]
- for daemons in [[], ['a', 'b']]:
- conf.reload_daemons(daemons)
- self.io_mock.notify.assert_has_calls(calls)
- self.io_mock.reset_mock()
-
- # User-defined cluster: reload daemons in the parameter
- self._set_user_defined_clusters_location()
- conf = GaneshaConf.instance('_default_')
- calls = [mock_call('conf-{}'.format(daemon)) for daemon in ['nodea', 'nodeb']]
- conf.reload_daemons(['nodea', 'nodeb'])
- self.io_mock.notify.assert_has_calls(calls)
-
- def test_list_daemons(self):
- for cluster_id, info in self.clusters.items():
- instance = GaneshaConf.instance(cluster_id)
- daemons = instance.list_daemons()
- for daemon in daemons:
- self.assertEqual(daemon['cluster_id'], cluster_id)
- self.assertEqual(daemon['cluster_type'], info['type'])
- self.assertIn('daemon_id', daemon)
- self.assertIn('status', daemon)
- self.assertIn('status_desc', daemon)
- self.assertEqual([daemon['daemon_id'] for daemon in daemons], info['daemons'])
-
- def test_validate_orchestrator(self):
- cluster_id = 'foo'
- cluster_info = self.clusters[cluster_id]
- instance = GaneshaConf.instance(cluster_id)
- export = MagicMock()
-
- # export can be linked to none or all daemons
- export_daemons = [[], cluster_info['daemons']]
- for daemons in export_daemons:
- export.daemons = daemons
- instance.validate(export)
-
- # raise if linking to partial or non-exist daemons
- export_daemons = [cluster_info['daemons'][:1], 'xxx']
- for daemons in export_daemons:
- with self.assertRaises(NFSException):
- export.daemons = daemons
- instance.validate(export)
-
- def test_validate_user(self):
- self._set_user_defined_clusters_location()
- cluster_id = '_default_'
- instance = GaneshaConf.instance(cluster_id)
- export = MagicMock()
-
- # export can be linked to none, partial, or all daemons
- fake_daemons = ['nodea', 'nodeb']
- export_daemons = [[], fake_daemons[:1], fake_daemons]
- for daemons in export_daemons:
- export.daemons = daemons
- instance.validate(export)
-
- # raise if linking to non-exist daemons
- export_daemons = ['xxx']
- for daemons in export_daemons:
- with self.assertRaises(NFSException):
- export.daemons = daemons
- instance.validate(export)
-
- def _verify_locations(self, locations, cluster_ids):
- for cluster_id in cluster_ids:
- self.assertIn(cluster_id, locations)
- cluster = locations.pop(cluster_id)
- self.assertDictEqual(cluster, {key: cluster[key] for key in [
- 'pool', 'namespace', 'type', 'daemon_conf']})
- self.assertDictEqual(locations, {})
-
- def test_get_cluster_locations(self):
- # pylint: disable=protected-access
-
- # There is only a Orchestrator cluster.
- self._mock_orchestrator(True)
- locations = ganesha.Ganesha._get_clusters_locations()
- self._verify_locations(locations, ['foo'])
-
- # No cluster.
- self._mock_orchestrator(False)
- with self.assertRaises(NFSException):
- ganesha.Ganesha._get_clusters_locations()
-
- # There is only a user-defined cluster.
- self._set_user_defined_clusters_location()
- self._mock_orchestrator(False)
- locations = ganesha.Ganesha._get_clusters_locations()
- self._verify_locations(locations, ['_default_'])
-
- # There are both Orchestrator cluster and user-defined cluster.
- self._set_user_defined_clusters_location()
- self._mock_orchestrator(True)
- locations = ganesha.Ganesha._get_clusters_locations()
- self._verify_locations(locations, ['foo', '_default_'])
-
- def test_get_cluster_locations_conflict(self):
- # pylint: disable=protected-access
-
- # Pool/namespace collision.
- self._set_user_defined_clusters_location('nfs-ganesha/foo')
- with self.assertRaises(NFSException) as ctx:
- ganesha.Ganesha._get_clusters_locations()
- self.assertIn('already in use', str(ctx.exception))
-
- # Cluster name collision with orch. cluster.
- self._set_user_defined_clusters_location('foo:nfs-ganesha/bar')
- with self.assertRaises(NFSException) as ctx:
- ganesha.Ganesha._get_clusters_locations()
- self.assertIn('Detected a conflicting NFS-Ganesha cluster', str(ctx.exception))
-
- # Cluster name collision with user-defined cluster.
- self._set_user_defined_clusters_location(
- 'cluster1:nfs-ganesha/bar,cluster1:fake-pool/fake-ns'
- )
- with self.assertRaises(NFSException) as ctx:
- ganesha.Ganesha._get_clusters_locations()
- self.assertIn('Duplicate Ganesha cluster definition', str(ctx.exception))
class NFSGaneshaUiControllerTest(ControllerTestCase):
except Exception as e:
return exception_handler(e, "Failed to list NFS Cluster")
+ def list_daemons(self):
+ completion = self.mgr.list_daemons(daemon_type='nfs')
+ # Here completion.result is a list DaemonDescription objects
+ daemons = orchestrator.raise_if_exception(completion)
+ return [
+ {
+ 'cluster_id': instance.service_id(),
+ 'daemon_id': instance.daemon_id,
+ 'cluster_type': 'orchestrator',
+ 'status': instance.status,
+ 'status_desc': instance.status_desc
+ } for instance in daemons
+ ]
+
def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]:
completion = self.mgr.list_daemons(daemon_type='nfs')
# Here completion.result is a list DaemonDescription objects
log.info('no exports for cluster %s', cluster_id)
return None
+ def _fetch_export_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Optional[Export]:
+ try:
+ for ex in self.exports[cluster_id]:
+ if ex.export_id == export_id:
+ return ex
+ return None
+ except KeyError:
+ log.info(f'no exports for cluster {cluster_id}')
+ return None
+
def _delete_export_user(self, export: Export) -> None:
if isinstance(export.fsal, CephFSFSAL):
assert export.fsal.user_id
raise NFSException(f"Failed to delete exports: {err} and {ret}")
log.info("All exports successfully deleted for cluster id: %s", cluster_id)
+ def list_all_exports(self):
+ r = []
+ for cluster_id, ls in self.exports.items():
+ r.extend([e.to_dict() for e in ls])
+ return r
+
@export_cluster_checker
def list_exports(self,
cluster_id: str,
except Exception as e:
return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
+ def get_export_by_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Dict[Any, Any]:
+ export = self._fetch_export_id(cluster_id, export_id)
+ return export.to_dict() if export else None
+
def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]:
try:
if not export_config:
ret, out, err = (0, '', '')
for export in j:
try:
- r, o, e = self._apply_export(cluster_id, export)
+ r, o, e, ex = self._apply_export(cluster_id, export)
except Exception as ex:
- r, o, e = exception_handler(ex, f'Failed to apply export: {ex}')
+ r, o, e, ex = exception_handler(ex, f'Failed to apply export: {ex}')
if r:
ret = r
if o:
err += e + '\n'
return ret, out, err
else:
- return self._apply_export(cluster_id, j)
+ r, o, e, ex = self._apply_export(cluster_id, j)
+ return r, o, e
except NotImplementedError:
return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
except Exception as e:
self,
cluster_id: str,
new_export_dict: Dict,
- ) -> Tuple[int, str, str]:
+ ) -> Tuple[int, str, str, Export]:
for k in ['path', 'pseudo']:
if k not in new_export_dict:
raise NFSInvalidOperation(f'Export missing required field {k}')
if not old_export:
self._create_export_user(new_export)
self._save_export(cluster_id, new_export)
- return 0, f'Added export {new_export.pseudo}', ''
+ return 0, f'Added export {new_export.pseudo}', '', new_export
if old_export.fsal.name != new_export.fsal.name:
raise NFSInvalidOperation('FSAL change not allowed')
# TODO: detect whether the update is such that a reload is sufficient
restart_nfs_service(self.mgr, new_export.cluster_id)
- return 0, f"Updated export {new_export.pseudo}", ""
+ return 0, f"Updated export {new_export.pseudo}", "", new_export
import logging
import threading
-from typing import Tuple, Optional, List
+from typing import Tuple, Optional, List, Dict, Any
from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput
import orchestrator
from .export import ExportMgr
from .cluster import NFSCluster
-from typing import Any
+from .utils import available_clusters
log = logging.getLogger(__name__)
def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> Tuple[int, str, str]:
"""Reset NFS-Ganesha Config to default"""
return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)
+
+ def is_active(self) -> bool:
+ return True
+
+ def export_ls(self) -> List[Dict[Any, Any]]:
+ return self.export_mgr.list_all_exports()
+
+ def export_get(self, cluster_id: str, export_id: int) -> Dict[Any, Any]:
+ return self.export_mgr.get_export_by_id(cluster_id, export_id)
+
+ def export_rm(self, cluster_id: str, pseudo: str) -> None:
+ self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo)
+
+ def daemon_ls(self) -> List[Dict[Any, Any]]:
+ return self.nfs.list_daemons()
+
+ def cluster_ls(self) -> List[str]:
+ return [
+ {
+ 'pool': NFS_POOL_NAME,
+ 'namespace': cluster_id,
+ 'type': 'orchestrator',
+ 'daemon_conf': None,
+ } for cluster_id in available_clusters()
+ ]
+
+ def cluster_fsals(self) -> List[str]:
+ return ['CEPH', 'RGW']
+
+ def export_apply(self, cluster_id: str, export: Dict[Any, Any]) -> Dict[Any, Any]:
+ ret, out, err, export = self.export_mgr._apply_export(cluster_id, export)
+ if ret:
+ return None
+ return export.to_dict()