]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: support Orchestrator and user-defined Ganesha clusters
authorKiefer Chang <kiefer.chang@suse.com>
Wed, 2 Sep 2020 12:28:36 +0000 (20:28 +0800)
committerKiefer Chang <kiefer.chang@suse.com>
Mon, 19 Oct 2020 11:11:14 +0000 (19:11 +0800)
This change make the Dashboard support two types of Ganesha clusters:

- Orchestrator clusters (Since Octopus)
  - Deployed by the Orchestrator.
  - The Dashboard gets the pool/namespace that stores Ganesha
    configuration objects from the Orchestrator.
  - The Dashboard gets the daemons in a cluster from the Orchestrator.

- User-defined clusters (Since Nautilus)
  - Clusters defined by using `ceph dashboard
    set-ganesha-clusters-rados-pool-namespace` command is treated as
    user-defined clusters.
  - Each daemon has its own RADOS configuration objects. The
    Dashboard uses these objects to deduce daemons.

Fixes: https://tracker.ceph.com/issues/46492
Signed-off-by: Kiefer Chang <kiefer.chang@suse.com>
qa/tasks/mgr/dashboard/test_ganesha.py
src/pybind/mgr/dashboard/controllers/nfsganesha.py
src/pybind/mgr/dashboard/openapi.yaml
src/pybind/mgr/dashboard/services/ganesha.py
src/pybind/mgr/dashboard/tests/test_ganesha.py

index 60ec76f78b2290b3f02a336ce4a9fb78195684e6..229d8af8195393d3d93352d9bb84e5593b1be052 100644 (file)
@@ -161,7 +161,9 @@ class GaneshaTest(DashboardTestCase):
         self.assertIn('available', data)
         self.assertIn('message', data)
         self.assertFalse(data['available'])
-        self.assertIn('Ganesha config location is not configured. Please set the GANESHA_RADOS_POOL_NAMESPACE setting.',
+        self.assertIn(("NFS-Ganesha cluster is not detected. "
+                       "Please set the GANESHA_RADOS_POOL_NAMESPACE "
+                       "setting or deploy an NFS-Ganesha cluster with the Orchestrator."),
                       data['message'])
 
         self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
index 1b803906947cf2b2a1149347a817b085bd94f7d4..b9eb79b0240bf763d21bde36410541372b0fab7b 100644 (file)
@@ -241,30 +241,14 @@ class NFSGaneshaService(RESTController):
                  responses={200: [{
                      'daemon_id': (str, 'Daemon identifier'),
                      'cluster_id': (str, 'Cluster identifier'),
-                     'status': (int,
-                                'Status of daemon (1=RUNNING, 0=STOPPED, -1=ERROR',
-                                True),
-                     'desc': (str, 'Error description (if status==-1)', True)
+                     'cluster_type': (str, 'Cluster type'),
+                     'status': (int, 'Status of daemon', True),
+                     'desc': (str, 'Status description', True)
                  }]})
     def list(self):
-        status_dict = Ganesha.get_daemons_status()
-        if status_dict:
-            return [
-                {
-                    'daemon_id': daemon_id,
-                    'cluster_id': cluster_id,
-                    'status': status_dict[cluster_id][daemon_id]['status'],
-                    'desc': status_dict[cluster_id][daemon_id]['desc']
-                }
-                for cluster_id in status_dict
-                for daemon_id in status_dict[cluster_id]
-            ]
-
         result = []
         for cluster_id in Ganesha.get_ganesha_clusters():
-            result.extend(
-                [{'daemon_id': daemon_id, 'cluster_id': cluster_id}
-                 for daemon_id in GaneshaConf.instance(cluster_id).list_daemons()])
+            result.extend(GaneshaConf.instance(cluster_id).list_daemons())
         return result
 
 
index efcde8b6223a5f0256c5e942bb267594a4c6775d..ffe1798762538752d57d39a870ace6bd1e77a8fd 100644 (file)
@@ -4133,19 +4133,23 @@ paths:
                     cluster_id:
                       description: Cluster identifier
                       type: string
+                    cluster_type:
+                      description: Cluster type
+                      type: string
                     daemon_id:
                       description: Daemon identifier
                       type: string
                     desc:
-                      description: Error description (if status==-1)
+                      description: Status description
                       type: string
                     status:
-                      description: Status of daemon (1=RUNNING, 0=STOPPED, -1=ERROR
+                      description: Status of daemon
                       type: integer
                   type: object
                 required:
                 - daemon_id
                 - cluster_id
+                - cluster_type
                 type: array
           description: OK
         '400':
index 128d5981a13930978c43797653f128e3cca5a710..ba2ad7144d63f80699ef88e06605e8cdf97d368c 100644 (file)
@@ -5,8 +5,10 @@ from __future__ import absolute_import
 import logging
 import os
 import re
+from typing import Any, Dict, List, Optional, cast
 
-from orchestrator import OrchestratorError
+from ceph.deployment.service_spec import NFSServiceSpec
+from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
 
 from .. import mgr
 from ..exceptions import DashboardException
@@ -27,13 +29,19 @@ class NFSException(DashboardException):
 class Ganesha(object):
     @classmethod
     def _get_clusters_locations(cls):
-        result = {}  # type: ignore
+        # pylint: disable=too-many-branches
+        # Get Orchestrator clusters
+        orch_result = cls._get_orch_clusters_locations()
+
+        # Get user-defined clusters
         location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
-        if not location_list_str:
-            raise NFSException("Ganesha config location is not configured. "
+        if not orch_result and not location_list_str:
+            raise NFSException("NFS-Ganesha cluster is not detected. "
                                "Please set the GANESHA_RADOS_POOL_NAMESPACE "
-                               "setting.")
-        location_list = [loc.strip() for loc in location_list_str.split(",")]
+                               "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
+        result = {}  # type: ignore
+        location_list = [loc.strip() for loc in location_list_str.split(
+            ",")] if location_list_str else []
         for location in location_list:
             cluster = None
             pool = None
@@ -61,42 +69,54 @@ class Ganesha(object):
                 else:
                     pool, namespace = pool_nm.split('/', 1)
 
+            if cluster in orch_result:
+                # cephadm might have set same cluster settings, ask the user to remove it.
+                raise NFSException(
+                    'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
+                    'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
+                    'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
+                    'setting.'.format(cluster))
+
             if cluster in result:
                 raise NFSException("Duplicate Ganesha cluster definition in "
                                    "the setting: {}".format(location_list_str))
-            result[cluster] = (pool, namespace)
+            result[cluster] = {
+                'pool': pool,
+                'namespace': namespace,
+                'type': ClusterType.USER,
+                'daemon_conf': None
+            }
+        return {**orch_result, **result}
 
-        return result
+    @classmethod
+    def _get_orch_clusters_locations(cls):
+        orch_result = {}  # type: ignore
+        services = cls._get_orch_nfs_services()
+        for service in services:
+            spec = cast(NFSServiceSpec, service.spec)
+            try:
+                orch_result[spec.service_id] = {
+                    'pool': spec.pool,
+                    'namespace': spec.namespace,
+                    'type': ClusterType.ORCHESTRATOR,
+                    'daemon_conf': spec.rados_config_name()
+                }
+            except AttributeError as ex:
+                logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
+                continue
+        return orch_result
 
     @classmethod
     def get_ganesha_clusters(cls):
         return list(cls._get_clusters_locations())
 
     @staticmethod
-    def _get_orch_nfs_instances():
+    def _get_orch_nfs_services() -> List[ServiceDescription]:
         try:
-            return OrchClient.instance().services.list("nfs")
+            return OrchClient.instance().services.list('nfs')
         except (RuntimeError, OrchestratorError, ImportError):
             return []
 
-    @classmethod
-    def get_daemons_status(cls):
-        instances = cls._get_orch_nfs_instances()
-        if not instances:
-            return None
-
-        result = {}  # type: ignore
-        for instance in instances:
-            if instance.service is None:
-                instance.service = "_default_"
-            if instance.service not in result:
-                result[instance.service] = {}
-            result[instance.service][instance.hostname] = {
-                'status': instance.status,
-                'desc': instance.status_desc,
-            }
-        return result
-
     @classmethod
     def parse_rados_url(cls, rados_url):
         if not rados_url.startswith("rados://"):
@@ -118,38 +138,13 @@ class Ganesha(object):
         return "rados://{}/{}".format(pool, obj)
 
     @classmethod
-    def get_pool_and_namespace(cls, cluster_id):
-        instances = cls._get_orch_nfs_instances()
-        # we assume that every instance stores there configuration in the
-        # same RADOS pool/namespace
-        if instances:
-            location = instances[0].rados_config_location
-            pool, ns, _ = cls.parse_rados_url(location)
-            return pool, ns
+    def get_cluster(cls, cluster_id):
         locations = cls._get_clusters_locations()
         if cluster_id not in locations:
             raise NFSException("Cluster not found: cluster_id={}"
                                .format(cluster_id))
         return locations[cluster_id]
 
-    @classmethod
-    def reload_daemons(cls, cluster_id, daemons_id):
-        logger.debug("issued reload of daemons: %s", daemons_id)
-        if not OrchClient.instance().available():
-            logger.debug("orchestrator not available")
-            return
-        reload_list = []
-        daemons = cls.get_daemons_status()
-        if cluster_id not in daemons:
-            raise NFSException("Cluster not found: cluster_id={}"
-                               .format(cluster_id))
-        for daemon_id in daemons_id:
-            if daemon_id not in daemons[cluster_id]:
-                continue
-            if daemons[cluster_id][daemon_id] == 1:
-                reload_list.append((cluster_id, daemon_id))
-        OrchClient.instance().services.reload("nfs", reload_list)
-
     @classmethod
     def fsals_available(cls):
         result = []
@@ -596,13 +591,8 @@ class Export(object):
         self.transports = set(transports)
         self.clients = clients
 
-    def validate(self, daemons_list):
+    def validate(self):
         # pylint: disable=R0912
-        for daemon_id in self.daemons:
-            if daemon_id not in daemons_list:
-                raise NFSException("Daemon '{}' does not exist"
-                                   .format(daemon_id))
-
         if not self.fsal.validate_path(self.path):
             raise NFSException("Export path ({}) is invalid.".format(self.path))
 
@@ -752,13 +742,24 @@ class Export(object):
         }
 
 
+class ClusterType(object):
+
+    # Ganesha clusters deployed by the Orchestrator.
+    ORCHESTRATOR = 'orchestrator'
+
+    # Ganesha clusters deployed manually by the user. Specified by using the
+    # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
+    USER = 'user'
+
+
 class GaneshaConf(object):
     # pylint: disable=R0902
 
-    def __init__(self, cluster_id, rados_pool, rados_namespace):
+    def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
         self.cluster_id = cluster_id
         self.rados_pool = rados_pool
         self.rados_namespace = rados_namespace
+        self.daemon_confs = daemon_confs if daemon_confs is not None else []
         self.export_conf_blocks = []  # type: ignore
         self.daemons_conf_blocks = {}  # type: ignore
         self._defaults = {}
@@ -779,39 +780,43 @@ class GaneshaConf(object):
             self.exports[export.export_id] = export
 
         # link daemons to exports
-        for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
-            for block in daemon_blocks:
-                if block['block_name'] == "%url":
-                    rados_url = block['value']
-                    _, _, obj = Ganesha.parse_rados_url(rados_url)
-                    if obj.startswith("export-"):
-                        export_id = int(obj[obj.find('-')+1:])
-                        self.exports[export_id].daemons.add(daemon_id)
+        self._link_daemons_to_exports()
+
+    def _link_daemons_to_exports(self):
+        raise NotImplementedError()
 
     @classmethod
     def instance(cls, cluster_id):
-        pool, ns = Ganesha.get_pool_and_namespace(cluster_id)
-        return cls(cluster_id, pool, ns)
+        cluster = Ganesha.get_cluster(cluster_id)
+        if cluster['type'] == ClusterType.ORCHESTRATOR:
+            return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
+                                           [cluster['daemon_conf']])
+        if cluster['type'] == ClusterType.USER:
+            return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
+        raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
+            cluster['type'], cluster_id))
 
     def _read_raw_config(self):
+
+        def _read_rados_obj(_obj):
+            size, _ = _obj.stat()
+            return _obj.read(size).decode("utf-8")
+
         with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
             if self.rados_namespace:
                 ioctx.set_namespace(self.rados_namespace)
             objs = ioctx.list_objects()
             for obj in objs:
                 if obj.key.startswith("export-"):
-                    size, _ = obj.stat()
-                    raw_config = obj.read(size)
-                    raw_config = raw_config.decode("utf-8")
+                    raw_config = _read_rados_obj(obj)
                     logger.debug("read export configuration from rados "
                                  "object %s/%s/%s:\n%s", self.rados_pool,
                                  self.rados_namespace, obj.key, raw_config)
                     self.export_conf_blocks.extend(
                         GaneshaConfParser(raw_config).parse())
-                elif obj.key.startswith("conf-"):
-                    size, _ = obj.stat()
-                    raw_config = obj.read(size)
-                    raw_config = raw_config.decode("utf-8")
+                elif not self.daemon_confs and obj.key.startswith("conf-"):
+                    # Read all `conf-xxx` for daemon configs.
+                    raw_config = _read_rados_obj(obj)
                     logger.debug("read daemon configuration from rados "
                                  "object %s/%s/%s:\n%s", self.rados_pool,
                                  self.rados_namespace, obj.key, raw_config)
@@ -819,6 +824,17 @@ class GaneshaConf(object):
                     self.daemons_conf_blocks[obj.key[idx+1:]] = \
                         GaneshaConfParser(raw_config).parse()
 
+            if self.daemon_confs:
+                # When daemon configs are provided.
+                for conf in self.daemon_confs:
+                    size, _ = ioctx.stat(conf)
+                    raw_config = ioctx.read(conf, size).decode("utf-8")
+                    logger.debug("read daemon configuration from rados "
+                                 "object %s/%s/%s:\n%s", self.rados_pool,
+                                 self.rados_namespace, conf, raw_config)
+                    self.daemons_conf_blocks[conf] = \
+                        GaneshaConfParser(raw_config).parse()
+
     def _write_raw_config(self, conf_block, obj):
         raw_config = GaneshaConfParser.write_conf(conf_block)
         with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
@@ -876,8 +892,8 @@ class GaneshaConf(object):
                 path = path[:-1]
         return path
 
-    def validate(self, export):
-        export.validate(self.list_daemons())
+    def validate(self, export: Export):
+        export.validate()
 
         if 4 in export.protocols:  # NFSv4 protocol
             len_prefix = 1
@@ -928,20 +944,7 @@ class GaneshaConf(object):
         return nid
 
     def _persist_daemon_configuration(self):
-        daemon_map = {}  # type: ignore
-        for daemon_id in self.list_daemons():
-            daemon_map[daemon_id] = []
-
-        for _, ex in self.exports.items():
-            for daemon in ex.daemons:
-                daemon_map[daemon].append({
-                    'block_name': "%url",
-                    'value': Ganesha.make_rados_url(
-                        self.rados_pool, self.rados_namespace,
-                        "export-{}".format(ex.export_id))
-                })
-        for daemon_id, conf_blocks in daemon_map.items():
-            self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
+        raise NotImplementedError()
 
     def _save_export(self, export):
         self.validate(export)
@@ -994,8 +997,11 @@ class GaneshaConf(object):
             return self.exports[export_id]
         return None
 
-    def list_daemons(self):
-        return list(self.daemons_conf_blocks)
+    def list_daemons(self) -> List[Dict[str, Any]]:
+        raise NotImplementedError()
+
+    def list_daemon_confs(self):
+        return self.daemons_conf_blocks.keys()
 
     def reload_daemons(self, daemons):
         with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
@@ -1003,3 +1009,111 @@ class GaneshaConf(object):
                 ioctx.set_namespace(self.rados_namespace)
             for daemon_id in daemons:
                 ioctx.notify("conf-{}".format(daemon_id))
+
+
+class GaneshaConfOrchestrator(GaneshaConf):
+    @classmethod
+    def _get_orch_nfs_instances(cls,
+                                service_name: Optional[str] = None) -> List[DaemonDescription]:
+        try:
+            return OrchClient.instance().services.\
+                list_daemons(service_name=service_name, daemon_type="nfs")
+        except (RuntimeError, OrchestratorError, ImportError):
+            return []
+
+    def _link_daemons_to_exports(self):
+        instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
+        daemon_ids = {instance.daemon_id for instance in instances}
+        for _, daemon_blocks in self.daemons_conf_blocks.items():
+            for block in daemon_blocks:
+                if block['block_name'] == "%url":
+                    rados_url = block['value']
+                    _, _, obj = Ganesha.parse_rados_url(rados_url)
+                    if obj.startswith("export-"):
+                        export_id = int(obj[obj.find('-')+1:])
+                        self.exports[export_id].daemons.update(daemon_ids)
+
+    def validate(self, export: Export):
+        daemons_list = {d['daemon_id'] for d in self.list_daemons()}
+        if export.daemons and set(export.daemons) != daemons_list:
+            raise NFSException('Export should be linked to all daemons.')
+        super().validate(export)
+
+    def _persist_daemon_configuration(self):
+        daemon_map = {}  # type: ignore
+        for daemon_id in self.list_daemon_confs():
+            daemon_map[daemon_id] = []
+
+        for daemon_id in self.list_daemon_confs():
+            for _, ex in self.exports.items():
+                if ex.daemons:
+                    daemon_map[daemon_id].append({
+                        'block_name': "%url",
+                        'value': Ganesha.make_rados_url(
+                            self.rados_pool, self.rados_namespace,
+                            "export-{}".format(ex.export_id))
+                    })
+        for daemon_id, conf_blocks in daemon_map.items():
+            self._write_raw_config(conf_blocks, daemon_id)
+
+    def list_daemons(self) -> List[Dict[str, Any]]:
+        instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
+        return [{
+            'cluster_id': self.cluster_id,
+            'daemon_id': instance.daemon_id,
+            'cluster_type': ClusterType.ORCHESTRATOR,
+            'status': instance.status,
+            'status_desc': instance.status_desc
+        } for instance in instances]
+
+    def reload_daemons(self, daemons):
+        with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+            if self.rados_namespace:
+                ioctx.set_namespace(self.rados_namespace)
+            for daemon_id in self.list_daemon_confs():
+                ioctx.notify(daemon_id)
+
+
+class GaneshaConfUser(GaneshaConf):
+
+    def _link_daemons_to_exports(self):
+        for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
+            for block in daemon_blocks:
+                if block['block_name'] == "%url":
+                    rados_url = block['value']
+                    _, _, obj = Ganesha.parse_rados_url(rados_url)
+                    if obj.startswith("export-"):
+                        export_id = int(obj[obj.find('-')+1:])
+                        self.exports[export_id].daemons.add(daemon_id)
+
+    def validate(self, export: Export):
+        daemons_list = [d['daemon_id'] for d in self.list_daemons()]
+        for daemon_id in export.daemons:
+            if daemon_id not in daemons_list:
+                raise NFSException("Daemon '{}' does not exist".format(daemon_id))
+        super().validate(export)
+
+    def _persist_daemon_configuration(self):
+        daemon_map = {}  # type: ignore
+        for daemon_id in self.list_daemon_confs():
+            daemon_map[daemon_id] = []
+
+        for _, ex in self.exports.items():
+            for daemon in ex.daemons:
+                daemon_map[daemon].append({
+                    'block_name': "%url",
+                    'value': Ganesha.make_rados_url(
+                        self.rados_pool, self.rados_namespace,
+                        "export-{}".format(ex.export_id))
+                })
+        for daemon_id, conf_blocks in daemon_map.items():
+            self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
+
+    def list_daemons(self) -> List[Dict[str, Any]]:
+        return [{
+            'cluster_id': self.cluster_id,
+            'cluster_type': ClusterType.USER,
+            'daemon_id': daemon_id,
+            'status': 1,
+            'status_desc': 'running'
+        } for daemon_id in self.list_daemon_confs()]
index 2151d5269a0b2629407118f5a9ad692c65c92205..258bd9da6dd35f77531c28948e9c36e0e9a6a10e 100644 (file)
@@ -2,19 +2,16 @@
 from __future__ import absolute_import
 
 import unittest
+from unittest.mock import MagicMock, Mock, patch
 from urllib.parse import urlencode
 
-try:
-    from mock import MagicMock, Mock, patch
-except ImportError:
-    from unittest.mock import MagicMock, Mock, patch
-
-import orchestrator
+from ceph.deployment.service_spec import NFSServiceSpec
+from orchestrator import DaemonDescription, ServiceDescription
 
 from .. import mgr
 from ..controllers.nfsganesha import NFSGaneshaUi
 from ..services import ganesha
-from ..services.ganesha import Export, GaneshaConf, GaneshaConfParser
+from ..services.ganesha import ClusterType, Export, GaneshaConf, GaneshaConfParser, NFSException
 from ..settings import Settings
 from . import ControllerTestCase  # pylint: disable=no-name-in-module
 from . import KVStoreMockMixin  # pylint: disable=no-name-in-module
@@ -82,12 +79,17 @@ EXPORT
 """
 
     conf_nodea = '''
-%url rados://ganesha/ns/export-2
+%url "rados://ganesha/ns/export-2"
 
 %url "rados://ganesha/ns/export-1"'''
 
     conf_nodeb = '%url "rados://ganesha/ns/export-1"'
 
+    conf_nfs_foo = '''
+%url "rados://ganesha2/ns2/export-1"
+
+%url "rados://ganesha2/ns2/export-2"'''
+
     class RObject(object):
         def __init__(self, key, raw):
             self.key = key
@@ -100,31 +102,67 @@ EXPORT
             return len(self.raw), None
 
     def _ioctx_write_full_mock(self, key, content):
-        if key not in self.temp_store:
-            self.temp_store[key] = GaneshaConfTest.RObject(key,
-                                                           content.decode('utf-8'))
+        if key not in self.temp_store[self.temp_store_namespace]:
+            self.temp_store[self.temp_store_namespace][key] = \
+                GaneshaConfTest.RObject(key, content.decode('utf-8'))
         else:
-            self.temp_store[key].raw = content.decode('utf-8')
+            self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8')
 
     def _ioctx_remove_mock(self, key):
-        del self.temp_store[key]
+        del self.temp_store[self.temp_store_namespace][key]
 
     def _ioctx_list_objects_mock(self):
-        return [obj for _, obj in self.temp_store.items()]
+        return [obj for _, obj in self.temp_store[self.temp_store_namespace].items()]
+
+    def _ioctl_stat_mock(self, key):
+        return self.temp_store[self.temp_store_namespace][key].stat()
+
+    def _ioctl_read_mock(self, key, size):
+        return self.temp_store[self.temp_store_namespace][key].read(size)
+
+    def _ioctx_set_namespace_mock(self, namespace):
+        self.temp_store_namespace = namespace
 
     def setUp(self):
         self.mock_kv_store()
 
-        Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = "ganesha/ns"
-
-        self.temp_store = {
-            'export-1': GaneshaConfTest.RObject("export-1", self.export_1),
-            'conf-nodea': GaneshaConfTest.RObject("conf-nodea", self.conf_nodea),
-            'export-2': GaneshaConfTest.RObject("export-2", self.export_2),
-            'conf-nodeb': GaneshaConfTest.RObject("conf-nodeb", self.conf_nodeb)
+        self.clusters = {
+            '_default_': {
+                'pool': 'ganesha',
+                'namespace': 'ns',
+                'type': ClusterType.USER,
+                'daemon_conf': None,
+                'daemons': ['nodea', 'nodeb'],
+                'exports': {
+                    1: ['nodea', 'nodeb'],
+                    2: ['nodea'],
+                    3: ['nodea', 'nodeb']  # for new-added export
+                }
+            },
+            'foo': {
+                'pool': 'ganesha2',
+                'namespace': 'ns2',
+                'type': ClusterType.ORCHESTRATOR,
+                'daemon_conf': 'conf-nfs.foo',
+                'daemons': ['foo.host_a', 'foo.host_b'],
+                'exports': {
+                    1: ['foo.host_a', 'foo.host_b'],
+                    2: ['foo.host_a', 'foo.host_b'],
+                    3: ['foo.host_a', 'foo.host_b']  # for new-added export
+                }
+            }
         }
 
+        Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = '{pool}/{namespace}'.format_map(
+            self.clusters['_default_'])
+
+        self.temp_store_namespace = None
+        self._reset_temp_store()
+
         self.io_mock = MagicMock()
+        self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock
+        self.io_mock.read = self._ioctl_read_mock
+        self.io_mock.stat = self._ioctl_stat_mock
         self.io_mock.list_objects.side_effect = self._ioctx_list_objects_mock
         self.io_mock.write_full.side_effect = self._ioctx_write_full_mock
         self.io_mock.remove_object.side_effect = self._ioctx_remove_mock
@@ -136,8 +174,7 @@ EXPORT
         mgr.rados = MagicMock()
         mgr.rados.open_ioctx.return_value = ioctx_mock
 
-        # pylint: disable=protected-access
-        mgr._select_orchestrator.side_effect = orchestrator.NoOrchestrator()
+        self._mock_orchestrator(True)
 
         ganesha.CephX = MagicMock()
         ganesha.CephX.list_clients.return_value = ['ganesha']
@@ -145,6 +182,55 @@ EXPORT
 
         ganesha.CephFS = MagicMock()
 
+    def _reset_temp_store(self):
+        self.temp_store_namespace = None
+        self.temp_store = {
+            'ns': {
+                'export-1': GaneshaConfTest.RObject("export-1", self.export_1),
+                'export-2': GaneshaConfTest.RObject("export-2", self.export_2),
+                'conf-nodea': GaneshaConfTest.RObject("conf-nodea", self.conf_nodea),
+                'conf-nodeb': GaneshaConfTest.RObject("conf-nodeb", self.conf_nodeb),
+            },
+            'ns2': {
+                'export-1': GaneshaConfTest.RObject("export-1", self.export_1),
+                'export-2': GaneshaConfTest.RObject("export-2", self.export_2),
+                'conf-nfs.foo': GaneshaConfTest.RObject("conf-nfs.foo", self.conf_nfs_foo)
+            }
+        }
+
+    def _mock_orchestrator(self, enable):
+        # mock nfs services
+        cluster_info = self.clusters['foo']
+        orch_nfs_services = [
+            ServiceDescription(spec=NFSServiceSpec(service_id='foo',
+                                                   pool=cluster_info['pool'],
+                                                   namespace=cluster_info['namespace']))
+        ] if enable else []
+        # pylint: disable=protected-access
+        ganesha.Ganesha._get_orch_nfs_services = Mock(return_value=orch_nfs_services)
+
+        # mock nfs daemons
+        def _get_nfs_instances(service_name=None):
+            if not enable:
+                return []
+            instances = {
+                'nfs.foo': [
+                    DaemonDescription(daemon_id='foo.host_a', status=1),
+                    DaemonDescription(daemon_id='foo.host_b', status=1)
+                ],
+                'nfs.bar': [
+                    DaemonDescription(daemon_id='bar.host_c', status=1)
+                ]
+            }
+            if service_name is not None:
+                return instances[service_name]
+            result = []
+            for _, daemons in instances.items():
+                result.extend(daemons)
+            return result
+        ganesha.GaneshaConfOrchestrator._get_orch_nfs_instances = Mock(
+            side_effect=_get_nfs_instances)
+
     def test_export_parser_1(self):
         blocks = GaneshaConfParser(self.export_1).parse()
         self.assertIsInstance(blocks, list)
@@ -215,7 +301,12 @@ EXPORT
         self.assertEqual(blocks[0]['value'], "rados://ganesha/ns/export-1")
 
     def test_ganesha_conf(self):
-        ganesha_conf = GaneshaConf.instance('_default_')
+        for cluster_id, info in self.clusters.items():
+            self._do_test_ganesha_conf(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_ganesha_conf(self, cluster, expected_exports):
+        ganesha_conf = GaneshaConf.instance(cluster)
         exports = ganesha_conf.exports
 
         self.assertEqual(len(exports.items()), 2)
@@ -246,6 +337,7 @@ EXPORT
         self.assertEqual(export.clients[1].access_type, "RO")
         self.assertEqual(export.attr_expiration_time, 0)
         self.assertEqual(export.security_label, False)
+        self.assertSetEqual(export.daemons, set(expected_exports[1]))
 
         # export_id = 2 asserts
         export = exports[2]
@@ -262,17 +354,23 @@ EXPORT
         self.assertEqual(export.fsal.access_key, "access_key")
         self.assertEqual(export.fsal.secret_key, "secret_key")
         self.assertEqual(len(export.clients), 0)
+        self.assertSetEqual(export.daemons, set(expected_exports[2]))
 
     def test_config_dict(self):
-        conf = GaneshaConf.instance('_default_')
+        for cluster_id, info in self.clusters.items():
+            self._do_test_config_dict(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_config_dict(self, cluster, expected_exports):
+        conf = GaneshaConf.instance(cluster)
         export = conf.exports[1]
         ex_dict = export.to_dict()
         self.assertDictEqual(ex_dict, {
-            'daemons': ['nodea', 'nodeb'],
+            'daemons': expected_exports[1],
             'export_id': 1,
             'path': '/',
             'pseudo': '/cephfs_a',
-            'cluster_id': '_default_',
+            'cluster_id': cluster,
             'tag': None,
             'access_type': 'RW',
             'squash': 'root_squash',
@@ -299,11 +397,11 @@ EXPORT
         export = conf.exports[2]
         ex_dict = export.to_dict()
         self.assertDictEqual(ex_dict, {
-            'daemons': ['nodea'],
+            'daemons': expected_exports[2],
             'export_id': 2,
             'path': '/',
             'pseudo': '/rgw',
-            'cluster_id': '_default_',
+            'cluster_id': cluster,
             'tag': None,
             'access_type': 'RW',
             'squash': 'all_squash',
@@ -318,11 +416,16 @@ EXPORT
         })
 
     def test_config_from_dict(self):
+        for cluster_id, info in self.clusters.items():
+            self._do_test_config_from_dict(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_config_from_dict(self, cluster_id, expected_exports):
         export = Export.from_dict(1, {
-            'daemons': ['nodea', 'nodeb'],
+            'daemons': expected_exports[1],
             'export_id': 1,
             'path': '/',
-            'cluster_id': '_default_',
+            'cluster_id': cluster_id,
             'pseudo': '/cephfs_a',
             'tag': None,
             'access_type': 'RW',
@@ -367,17 +470,17 @@ EXPORT
         self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
         self.assertEqual(export.clients[1].squash, "all_squash")
         self.assertEqual(export.clients[1].access_type, "RO")
-        self.assertEqual(export.daemons, {"nodeb", "nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[1]))
+        self.assertEqual(export.cluster_id, cluster_id)
         self.assertEqual(export.attr_expiration_time, 0)
         self.assertEqual(export.security_label, True)
 
         export = Export.from_dict(2, {
-            'daemons': ['nodea'],
+            'daemons': expected_exports[2],
             'export_id': 2,
             'path': '/',
             'pseudo': '/rgw',
-            'cluster_id': '_default_',
+            'cluster_id': cluster_id,
             'tag': None,
             'access_type': 'RW',
             'squash': 'all_squash',
@@ -404,16 +507,21 @@ EXPORT
         self.assertIsNone(export.fsal.access_key)
         self.assertIsNone(export.fsal.secret_key)
         self.assertEqual(len(export.clients), 0)
-        self.assertEqual(export.daemons, {"nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[2]))
+        self.assertEqual(export.cluster_id, cluster_id)
 
     def test_gen_raw_config(self):
-        conf = GaneshaConf.instance('_default_')
+        for cluster_id, info in self.clusters.items():
+            self._do_test_gen_raw_config(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_gen_raw_config(self, cluster_id, expected_exports):
+        conf = GaneshaConf.instance(cluster_id)
         # pylint: disable=W0212
         export = conf.exports[1]
         del conf.exports[1]
         conf._save_export(export)
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         exports = conf.exports
         self.assertEqual(len(exports.items()), 2)
         self.assertIn(1, exports)
@@ -441,8 +549,8 @@ EXPORT
         self.assertEqual(export.clients[1].addresses, ["192.168.0.0/16"])
         self.assertEqual(export.clients[1].squash, "all_squash")
         self.assertEqual(export.clients[1].access_type, "RO")
-        self.assertEqual(export.daemons, {"nodeb", "nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[1]))
+        self.assertEqual(export.cluster_id, cluster_id)
         self.assertEqual(export.attr_expiration_time, 0)
         self.assertEqual(export.security_label, False)
 
@@ -461,10 +569,15 @@ EXPORT
         self.assertEqual(export.fsal.access_key, "access_key")
         self.assertEqual(export.fsal.secret_key, "secret_key")
         self.assertEqual(len(export.clients), 0)
-        self.assertEqual(export.daemons, {"nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[2]))
+        self.assertEqual(export.cluster_id, cluster_id)
 
     def test_update_export(self):
+        for cluster_id, info in self.clusters.items():
+            self._do_test_update_export(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_update_export(self, cluster_id, expected_exports):
         ganesha.RgwClient = MagicMock()
         admin_inst_mock = MagicMock()
         admin_inst_mock.get_user_keys.return_value = {
@@ -473,13 +586,13 @@ EXPORT
         }
         ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         conf.update_export({
             'export_id': 2,
-            'daemons': ["nodeb"],
+            'daemons': expected_exports[2],
             'path': 'bucket',
             'pseudo': '/rgw/bucket',
-            'cluster_id': '_default_',
+            'cluster_id': cluster_id,
             'tag': 'bucket_tag',
             'access_type': 'RW',
             'squash': 'all_squash',
@@ -497,7 +610,7 @@ EXPORT
             }
         })
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         export = conf.get_export(2)
         self.assertEqual(export.export_id, 2)
         self.assertEqual(export.path, "bucket")
@@ -515,11 +628,16 @@ EXPORT
         self.assertEqual(export.clients[0].addresses, ["192.168.0.0/16"])
         self.assertIsNone(export.clients[0].squash)
         self.assertIsNone(export.clients[0].access_type)
-        self.assertEqual(export.daemons, {"nodeb"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[2]))
+        self.assertEqual(export.cluster_id, cluster_id)
 
     def test_remove_export(self):
-        conf = GaneshaConf.instance('_default_')
+        for cluster_id, info in self.clusters.items():
+            self._do_test_remove_export(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_remove_export(self, cluster_id, expected_exports):
+        conf = GaneshaConf.instance(cluster_id)
         conf.remove_export(1)
         exports = conf.list_exports()
         self.assertEqual(len(exports), 1)
@@ -538,10 +656,15 @@ EXPORT
         self.assertEqual(export.fsal.access_key, "access_key")
         self.assertEqual(export.fsal.secret_key, "secret_key")
         self.assertEqual(len(export.clients), 0)
-        self.assertEqual(export.daemons, {"nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[2]))
+        self.assertEqual(export.cluster_id, cluster_id)
 
     def test_create_export_rgw(self):
+        for cluster_id, info in self.clusters.items():
+            self._do_test_create_export_rgw(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_create_export_rgw(self, cluster_id, expected_exports):
         ganesha.RgwClient = MagicMock()
         admin_inst_mock = MagicMock()
         admin_inst_mock.get_user_keys.return_value = {
@@ -550,13 +673,13 @@ EXPORT
         }
         ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         ex_id = conf.create_export({
-            'daemons': ["nodeb"],
+            'daemons': expected_exports[3],
             'path': 'bucket',
             'pseudo': '/rgw/bucket',
             'tag': 'bucket_tag',
-            'cluster_id': '_default_',
+            'cluster_id': cluster_id,
             'access_type': 'RW',
             'squash': 'all_squash',
             'security_label': False,
@@ -573,7 +696,7 @@ EXPORT
             }
         })
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         exports = conf.list_exports()
         self.assertEqual(len(exports), 3)
         export = conf.get_export(ex_id)
@@ -593,10 +716,15 @@ EXPORT
         self.assertEqual(export.clients[0].addresses, ["192.168.0.0/16"])
         self.assertIsNone(export.clients[0].squash)
         self.assertIsNone(export.clients[0].access_type)
-        self.assertEqual(export.daemons, {"nodeb"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[3]))
+        self.assertEqual(export.cluster_id, cluster_id)
 
     def test_create_export_cephfs(self):
+        for cluster_id, info in self.clusters.items():
+            self._do_test_create_export_cephfs(cluster_id, info['exports'])
+            self._reset_temp_store()
+
+    def _do_test_create_export_cephfs(self, cluster_id, expected_exports):
         ganesha.CephX = MagicMock()
         ganesha.CephX.list_clients.return_value = ["fs"]
         ganesha.CephX.get_client_key.return_value = "fs_key"
@@ -604,12 +732,12 @@ EXPORT
         ganesha.CephFS = MagicMock()
         ganesha.CephFS.dir_exists.return_value = True
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         ex_id = conf.create_export({
-            'daemons': ['nodea', 'nodeb'],
+            'daemons': expected_exports[3],
             'path': '/',
             'pseudo': '/cephfs2',
-            'cluster_id': '_default_',
+            'cluster_id': cluster_id,
             'tag': None,
             'access_type': 'RW',
             'squash': 'all_squash',
@@ -625,7 +753,7 @@ EXPORT
             }
         })
 
-        conf = GaneshaConf.instance('_default_')
+        conf = GaneshaConf.instance(cluster_id)
         exports = conf.list_exports()
         self.assertEqual(len(exports), 3)
         export = conf.get_export(ex_id)
@@ -643,11 +771,121 @@ EXPORT
         self.assertEqual(export.fsal.sec_label_xattr, "security.selinux")
         self.assertIsNone(export.fsal.fs_name)
         self.assertEqual(len(export.clients), 0)
-        self.assertEqual(export.daemons, {"nodeb", "nodea"})
-        self.assertEqual(export.cluster_id, '_default_')
+        self.assertEqual(export.daemons, set(expected_exports[3]))
+        self.assertEqual(export.cluster_id, cluster_id)
         self.assertEqual(export.attr_expiration_time, 0)
         self.assertEqual(export.security_label, True)
 
+    def test_reload_daemons(self):
+        # Fail to import call in Python 3.8, see https://bugs.python.org/issue35753
+        mock_call = unittest.mock.call
+
+        # Orchestrator cluster: reload all daemon config objects.
+        conf = GaneshaConf.instance('foo')
+        calls = [mock_call(conf) for conf in conf.list_daemon_confs()]
+        for daemons in [[], ['a', 'b']]:
+            conf.reload_daemons(daemons)
+            self.io_mock.notify.assert_has_calls(calls)
+            self.io_mock.reset_mock()
+
+        # User-defined cluster: reload daemons in the parameter
+        conf = GaneshaConf.instance('_default_')
+        calls = [mock_call('conf-{}'.format(daemon)) for daemon in ['nodea', 'nodeb']]
+        conf.reload_daemons(['nodea', 'nodeb'])
+        self.io_mock.notify.assert_has_calls(calls)
+
+    def test_list_daemons(self):
+        for cluster_id, info in self.clusters.items():
+            instance = GaneshaConf.instance(cluster_id)
+            daemons = instance.list_daemons()
+            for daemon in daemons:
+                self.assertEqual(daemon['cluster_id'], cluster_id)
+                self.assertEqual(daemon['cluster_type'], info['type'])
+                self.assertIn('daemon_id', daemon)
+                self.assertIn('status', daemon)
+                self.assertIn('status_desc', daemon)
+            self.assertEqual([daemon['daemon_id'] for daemon in daemons], info['daemons'])
+
+    def test_validate_orchestrator(self):
+        cluster_id = 'foo'
+        cluster_info = self.clusters[cluster_id]
+        instance = GaneshaConf.instance(cluster_id)
+        export = MagicMock()
+
+        # export can be linked to none or all daemons
+        export_daemons = [[], cluster_info['daemons']]
+        for daemons in export_daemons:
+            export.daemons = daemons
+            instance.validate(export)
+
+        # raise if linking to partial or non-exist daemons
+        export_daemons = [cluster_info['daemons'][:1], 'xxx']
+        for daemons in export_daemons:
+            with self.assertRaises(NFSException):
+                export.daemons = daemons
+                instance.validate(export)
+
+    def test_validate_user(self):
+        cluster_id = '_default_'
+        cluster_info = self.clusters[cluster_id]
+        instance = GaneshaConf.instance(cluster_id)
+        export = MagicMock()
+
+        # export can be linked to none, partial, or all daemons
+        export_daemons = [[], cluster_info['daemons'][:1], cluster_info['daemons']]
+        for daemons in export_daemons:
+            export.daemons = daemons
+            instance.validate(export)
+
+        # raise if linking to non-exist daemons
+        export_daemons = ['xxx']
+        for daemons in export_daemons:
+            with self.assertRaises(NFSException):
+                export.daemons = daemons
+                instance.validate(export)
+
+    def _verify_locations(self, locations, cluster_ids):
+        for cluster_id in cluster_ids:
+            self.assertIn(cluster_id, locations)
+            cluster = locations.pop(cluster_id)
+            expected_info = self.clusters[cluster_id]
+            self.assertDictEqual(cluster, {key: expected_info[key] for key in [
+                'pool', 'namespace', 'type', 'daemon_conf']})
+        self.assertDictEqual(locations, {})
+
+    def test_get_cluster_locations(self):
+        # pylint: disable=protected-access
+        # There are both Orchstrator cluster and user-defined cluster.
+        locations = ganesha.Ganesha._get_clusters_locations()
+        self._verify_locations(locations, self.clusters.keys())
+
+        # There is only a user-defined cluster.
+        self._mock_orchestrator(False)
+        locations = ganesha.Ganesha._get_clusters_locations()
+        self._verify_locations(locations, ['_default_'])
+        self._mock_orchestrator(True)
+
+        # There is only a Orchstrator cluster.
+        Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = ''
+        locations = ganesha.Ganesha._get_clusters_locations()
+        self._verify_locations(locations, ['foo'])
+
+        # No cluster.
+        self._mock_orchestrator(False)
+        with self.assertRaises(NFSException):
+            ganesha.Ganesha._get_clusters_locations()
+
+    def test_get_cluster_locations_conflict(self):
+        # pylint: disable=protected-access
+        # Raise an exception when there is a user-defined cluster that conlicts
+        # with Orchestrator clusters.
+        old_setting = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
+        conflicted_location = ',foo:{pool}/{namespace}'.format_map(
+            self.clusters['foo'])
+        Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = old_setting + conflicted_location
+        with self.assertRaises(NFSException):
+            ganesha.Ganesha._get_clusters_locations()
+
 
 class NFSGaneshaUiControllerTest(ControllerTestCase):
     @classmethod