]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes/nfs: Fetch exports in persistent way if mgr is restarted
authorVarsha Rao <varao@redhat.com>
Fri, 5 Jun 2020 09:14:59 +0000 (14:44 +0530)
committerVarsha Rao <varao@redhat.com>
Tue, 30 Jun 2020 17:47:38 +0000 (23:17 +0530)
Fixes: https://tracker.ceph.com/issues/45740
Signed-off-by: Varsha Rao <varao@redhat.com>
src/pybind/mgr/volumes/fs/nfs.py

index 761a2e88ea8826660c2d85d07ad70c4925f4fdfb..e24dd2f04214f313b4c79a353babe7167ffa4372 100644 (file)
@@ -1,6 +1,7 @@
 import errno
 import json
 import logging
+from typing import List
 
 from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec
 from rados import TimedOut
@@ -11,10 +12,119 @@ from .fs_util import create_pool
 
 log = logging.getLogger(__name__)
 
+def available_clusters(mgr):
+    completion = mgr.describe_service(service_type='nfs')
+    mgr._orchestrator_wait([completion])
+    orchestrator.raise_if_exception(completion)
+    return [cluster.spec.service_id for cluster in completion.result]
+
 class GaneshaConfParser(object):
     def __init__(self, raw_config):
         self.pos = 0
         self.text = ""
+        self.clean_config(raw_config)
+
+    def clean_config(self, raw_config):
+        for line in raw_config.split("\n"):
+            self.text += line
+            if line.startswith("%"):
+                self.text += "\n"
+
+    def remove_whitespaces_quotes(self):
+        if self.text.startswith("%url"):
+            self.text = self.text.replace('"', "")
+        else:
+            self.text = "".join(self.text.split())
+
+    def stream(self):
+        return self.text[self.pos:]
+
+    def parse_block_name(self):
+        idx = self.stream().find('{')
+        if idx == -1:
+            raise Exception("Cannot find block name")
+        block_name = self.stream()[:idx]
+        self.pos += idx+1
+        return block_name
+
+    def parse_block_or_section(self):
+        if self.stream().startswith("%url "):
+            # section line
+            self.pos += 5
+            idx = self.stream().find('\n')
+            if idx == -1:
+                value = self.stream()
+                self.pos += len(value)
+            else:
+                value = self.stream()[:idx]
+                self.pos += idx+1
+            block_dict = {'block_name': '%url', 'value': value}
+            return block_dict
+
+        block_dict = {'block_name': self.parse_block_name().upper()}
+        self.parse_block_body(block_dict)
+        if self.stream()[0] != '}':
+            raise Exception("No closing bracket '}' found at the end of block")
+        self.pos += 1
+        return block_dict
+
+    def parse_parameter_value(self, raw_value):
+        if raw_value.find(',') != -1:
+            return [self.parse_parameter_value(v.strip())
+                    for v in raw_value.split(',')]
+        try:
+            return int(raw_value)
+        except ValueError:
+            if raw_value == "true":
+                return True
+            if raw_value == "false":
+                return False
+            if raw_value.find('"') == 0:
+                return raw_value[1:-1]
+            return raw_value
+
+    def parse_stanza(self, block_dict):
+        equal_idx = self.stream().find('=')
+        if equal_idx == -1:
+            raise Exception("Malformed stanza: no equal symbol found.")
+        semicolon_idx = self.stream().find(';')
+        parameter_name = self.stream()[:equal_idx].lower()
+        parameter_value = self.stream()[equal_idx+1:semicolon_idx]
+        block_dict[parameter_name] = self.parse_parameter_value(parameter_value)
+        self.pos += semicolon_idx+1
+
+    def parse_block_body(self, block_dict):
+        while True:
+            if self.stream().find('}') == 0:
+                # block end
+                return
+
+            last_pos = self.pos
+            semicolon_idx = self.stream().find(';')
+            lbracket_idx = self.stream().find('{')
+            is_semicolon = (semicolon_idx != -1)
+            is_lbracket = (lbracket_idx != -1)
+            is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx)
+
+            if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket):
+                self.parse_stanza(block_dict)
+            elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) or
+                                  (not is_semicolon)):
+                if '_blocks_' not in block_dict:
+                    block_dict['_blocks_'] = []
+                block_dict['_blocks_'].append(self.parse_block_or_section())
+            else:
+                raise Exception("Malformed stanza: no semicolon found.")
+
+            if last_pos == self.pos:
+                raise Exception("Infinite loop while parsing block content")
+
+    def parse(self):
+        self.remove_whitespaces_quotes()
+        blocks = []
+        while self.stream():
+            blocks.append(self.parse_block_or_section())
+        return blocks
 
     @staticmethod
     def _indentation(depth, size=4):
@@ -48,7 +158,7 @@ class GaneshaConfParser(object):
         return conf_str
 
     @staticmethod
-    def write_block(block, depth):
+    def write_block(block, depth=0):
         if block['block_name'] == "%url":
             return '%url "{}"\n\n'.format(block['value'])
 
@@ -58,17 +168,9 @@ class GaneshaConfParser(object):
         conf_str += " {\n"
         conf_str += GaneshaConfParser.write_block_body(block, depth+1)
         conf_str += GaneshaConfParser._indentation(depth)
-        conf_str += "}\n\n"
+        conf_str += "}\n"
         return conf_str
 
-    @staticmethod
-    def write_conf(blocks):
-        if not isinstance(blocks, list):
-            blocks = [blocks]
-        conf_str = ""
-        for block in blocks:
-            conf_str += GaneshaConfParser.write_block(block, 0)
-        return conf_str
 
 class CephFSFSal():
     def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
@@ -162,6 +264,25 @@ class Export(object):
         self.transports = ["TCP"]
         self.clients = clients
 
+    @classmethod
+    def from_export_block(cls, export_block, cluster_id):
+        log.debug("parsing export block: %s", export_block)
+
+        fsal_block = [b for b in export_block['_blocks_']
+                      if b['block_name'] == "FSAL"]
+
+        client_blocks = [b for b in export_block['_blocks_']
+                         if b['block_name'] == "CLIENT"]
+
+        return cls(export_block['export_id'],
+                   export_block['path'],
+                   CephFSFSal.from_fsal_block(fsal_block[0]),
+                   cluster_id,
+                   export_block['pseudo'],
+                   export_block['access_type'],
+                   [Client.from_client_block(client)
+                    for client in client_blocks])
+
     def to_export_block(self):
         # pylint: disable=too-many-branches
         result = {
@@ -174,7 +295,7 @@ class Export(object):
             'attr_expiration_time': self.attr_expiration_time,
             'security_label': self.security_label,
             'protocols': self.protocols,
-            'transports': [self.transports],
+            'transports': self.transports,
         }
         result['_blocks_'] = [self.fsal.to_fsal_block()]
         result['_blocks_'].extend([client.to_client_block()
@@ -196,9 +317,22 @@ class FSExport(object):
         self.mgr = mgr
         self.rados_pool = 'nfs-ganesha'
         self.rados_namespace = namespace #TODO check if cluster exists
-        self.export_conf_blocks = []
         self.exports = {}
 
+        try:
+            log.info("Begin export parsing")
+            for cluster_id in available_clusters(self.mgr):
+                # Removes 'ganesha-' prefixes from cluster ids.
+                cluster_id = cluster_id[cluster_id.index('-')+1:]
+                self.export_conf_objs = []  # type: List[Export]
+                self._read_raw_config(cluster_id)
+                self.exports[cluster_id] = self.export_conf_objs
+            log.info(f"Exports parsed successfully {self.exports.items()}")
+        except orchestrator.NoOrchestrator:
+            # Pass it for vstart
+            log.info("Orchestrator not found")
+            pass
+
     @staticmethod
     def _check_rados_notify(ioctx, obj):
         try:
@@ -254,8 +388,24 @@ class FSExport(object):
                 break
         return nid
 
+    def _read_raw_config(self, rados_namespace):
+        with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+            if rados_namespace:
+                ioctx.set_namespace(rados_namespace)
+
+            for obj in ioctx.list_objects():
+                if obj.key.startswith("export-"):
+                    size, _ = obj.stat()
+                    raw_config = obj.read(size)
+                    raw_config = raw_config.decode("utf-8")
+                    log.debug("read export configuration from rados "
+                            "object %s/%s/%s:\n%s", self.rados_pool,
+                            rados_namespace, obj.key, raw_config)
+                    self.export_conf_objs.append(Export.from_export_block(
+                        GaneshaConfParser(raw_config).parse()[0], rados_namespace))
+
     def _write_raw_config(self, conf_block, obj, append=False):
-        raw_config = GaneshaConfParser.write_conf(conf_block)
+        raw_config = GaneshaConfParser.write_block(conf_block)
         with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
             if self.rados_namespace:
                 ioctx.set_namespace(self.rados_namespace)
@@ -403,12 +553,6 @@ class NFSCluster:
             ioctx.remove_object(common_conf)
             log.info(f"Deleted object:{common_conf}")
 
-    def available_clusters(self):
-        completion = self.mgr.describe_service(service_type='nfs')
-        self.mgr._orchestrator_wait([completion])
-        orchestrator.raise_if_exception(completion)
-        return [cluster.spec.service_id for cluster in completion.result]
-
     def _set_cluster_id(self, cluster_id):
         self.cluster_id = "ganesha-%s" % cluster_id
 
@@ -445,7 +589,7 @@ class NFSCluster:
             self._set_cluster_id(cluster_id)
             self.create_empty_rados_obj()
 
-            if self.cluster_id not in self.available_clusters():
+            if self.cluster_id not in available_clusters(self.mgr):
                 self._call_orch_apply_nfs(placement)
                 return 0, "NFS Cluster Created Successfully", ""
             return 0, "", f"{self.cluster_id} cluster already exists"
@@ -457,7 +601,7 @@ class NFSCluster:
         try:
             self._set_pool_namespace(cluster_id)
             self._set_cluster_id(cluster_id)
-            if self.cluster_id in self.available_clusters():
+            if self.cluster_id in available_clusters(self.mgr):
                 self._call_orch_apply_nfs(placement)
                 return 0, "NFS Cluster Updated Successfully", ""
             return -errno.EINVAL, "", "Cluster does not exist"
@@ -467,10 +611,10 @@ class NFSCluster:
 
     def delete_nfs_cluster(self, cluster_id):
         try:
+            self._set_pool_namespace(cluster_id)
             self._set_cluster_id(cluster_id)
-            cluster_list = self.available_clusters()
-
-            if self.cluster_id in self.available_clusters():
+            cluster_list = available_clusters(self.mgr)
+            if self.cluster_id in cluster_list:
                 self.mgr.fs_export.delete_all_exports(cluster_id)
                 completion = self.mgr.remove_service('nfs.' + self.cluster_id)
                 self.mgr._orchestrator_wait([completion])