]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: cache subvolume (& subvolume group) path lookups 57853/head
authorAvan Thakkar <athakkar@redhat.com>
Mon, 3 Jun 2024 18:39:31 +0000 (00:09 +0530)
committerAvan Thakkar <athakkar@redhat.com>
Wed, 3 Jul 2024 04:33:17 +0000 (10:03 +0530)
Implement custom TTL cache for CephFS path resolver

- Added TTLCache class to manage cache with time-to-live expiration.
- Integrated TTLCache into CachingCephFSPathResolver.

Signed-off-by: Avan Thakkar <athakkar@redhat.com>
src/pybind/mgr/smb/fs.py
src/pybind/mgr/smb/module.py
src/pybind/mgr/smb/tests/test_fs.py

index 8aaa8bcde060abe2b743aae1fb05305b585d90d5..dc9613f21fced4f1d4af8670b9b3c835d416fd20 100644 (file)
@@ -1,8 +1,9 @@
-from typing import List, Optional
+from typing import Dict, List, Optional, Tuple
 
 import logging
 import posixpath
 import stat
+import time
 
 import cephfs
 from mgr_util import CephfsClient, Module_T, open_filesystem
@@ -60,16 +61,13 @@ class CephFSPathResolver:
         self._mgr = mgr
         self._cephfs_client = client or CephfsClient(mgr)
 
-    def resolve(
-        self, volume: str, subvolumegroup: str, subvolume: str, path: str
+    def resolve_subvolume_path(
+        self, volume: str, subvolumegroup: str, subvolume: str
     ) -> str:
-        """Given a volume, subvolumegroup, subvolume, and path, return the real
-        path within the file system. subvolumegroup and subvolume may be empty
-        strings when no subvolume is being used.
+        """Given a volume, subvolumegroup, and subvolume, return the real path
+        within the file system. subvolumegroup and subvolume may be empty strings
+        when no subvolume is being used.
         """
-        path = path.lstrip('/')
-        if not (subvolumegroup or subvolume):
-            return f'/{path}'
         cmd = {
             'prefix': 'fs subvolume getpath',
             'vol_name': volume,
@@ -81,8 +79,23 @@ class CephFSPathResolver:
         ret, data, status = self._mgr.mon_command(cmd)
         if ret != 0:
             raise CephFSSubvolumeResolutionError(status)
-        log.debug('Mapped subvolume to path: %r', data)
-        return posixpath.join(data.strip(), path)
+        log.info('Mapped subvolume to path: %r', data)
+        return data.strip()
+
+    def resolve(
+        self, volume: str, subvolumegroup: str, subvolume: str, path: str
+    ) -> str:
+        """Given a volume, subvolumegroup, subvolume, and path, return the real
+        path within the file system. subvolumegroup and subvolume may be empty
+        strings when no subvolume is being used.
+        """
+        path = path.lstrip('/')
+        if not (subvolumegroup or subvolume):
+            return f'/{path}'
+        subvolume_path = self.resolve_subvolume_path(
+            volume, subvolumegroup, subvolume
+        )
+        return posixpath.join(subvolume_path, path)
 
     def resolve_exists(
         self, volume: str, subvolumegroup: str, subvolume: str, path: str
@@ -108,3 +121,86 @@ class CephFSPathResolver:
                 raise NotADirectoryError(volpath)
         log.debug('Verified that %r exists in %r', volpath, volume)
         return volpath
+
+
+class _TTLCache:
+    def __init__(self, maxsize: int = 512, ttl: float = 300.0) -> None:
+        self.cache: Dict[Tuple[str, str, str], Tuple[str, float]] = {}
+        self.maxsize: int = maxsize
+        self.ttl: float = ttl
+
+    def _evict(self) -> None:
+        """Evicts items that have expired or if cache size exceeds maxsize."""
+        current_time: float = time.monotonic()
+        keys_to_evict: list[Tuple[str, str, str]] = [
+            key
+            for key, (_, timestamp) in self.cache.items()
+            if current_time - timestamp > self.ttl
+        ]
+        for key in keys_to_evict:
+            del self.cache[key]
+
+        # Further evict if cache size exceeds maxsize
+        if len(self.cache) > self.maxsize:
+            for key in list(self.cache.keys())[
+                : len(self.cache) - self.maxsize
+            ]:
+                del self.cache[key]
+
+    def get(self, key: Tuple[str, str, str]) -> Optional[str]:
+        """Retrieve item from cache if it exists and is not expired."""
+        self._evict()  # Ensure expired items are removed
+        if key in self.cache:
+            value, _ = self.cache[key]
+            return value
+        return None
+
+    def set(self, key: Tuple[str, str, str], value: str) -> None:
+        """Set item in cache, evicting expired or excess items."""
+        self._evict()  # Ensure expired items are removed
+        self.cache[key] = (value, time.monotonic())
+
+    def clear(self) -> None:
+        """Clear all items in the cache."""
+        self.cache.clear()
+
+    def __len__(self) -> int:
+        """Return the number of items currently in the cache."""
+        return len(self.cache)
+
+
+class CachingCephFSPathResolver(CephFSPathResolver):
+    """
+    A subclass of CephFSPathResolver that adds caching to the resolve method
+    to improve performance by reducing redundant path resolutions.
+
+    This implementation uses a TTL (Time-To-Live) cache rather than an LRU (Least
+    Recently Used) cache. The TTL cache is preferred in this scenario because
+    the validity of cached paths is time-sensitive, and we want to ensure that
+    paths are refreshed after a certain period regardless of access frequency.
+    Rlock can be used to synchronize access to the cache, but that is something
+    not required for now & can be later tested.
+    """
+
+    def __init__(
+        self, mgr: Module_T, *, client: Optional[CephfsClient] = None
+    ) -> None:
+        super().__init__(mgr, client=client)
+        # Initialize a TTL cache.
+        self._cache = _TTLCache(maxsize=512, ttl=5)
+
+    def resolve_subvolume_path(
+        self, volume: str, subvolumegroup: str, subvolume: str
+    ) -> str:
+        cache_key = (volume, subvolumegroup, subvolume)
+        cached_path = self._cache.get(cache_key)
+        if cached_path:
+            log.debug("Cache hit for key: %r", cache_key)
+            return cached_path
+
+        log.debug("Cache miss for key: %r", cache_key)
+        resolved_path = super().resolve_subvolume_path(
+            volume, subvolumegroup, subvolume
+        )
+        self._cache.set(cache_key, resolved_path)
+        return resolved_path
index 70c97d1a975483f3e7a822ce3cdf5eb32ae61477..43ad681769ad731e9767f382223b92e6e4ddc59f 100644 (file)
@@ -52,7 +52,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         self._public_store = (
             public_store or rados_store.RADOSConfigStore.init(self)
         )
-        path_resolver = path_resolver or fs.CephFSPathResolver(self)
+        path_resolver = path_resolver or fs.CachingCephFSPathResolver(self)
         # Why the honk is the cast needed but path_resolver doesn't need it??
         # Sometimes mypy drives me batty.
         authorizer = cast(
index 7af20eb6265ee642f7f76081eb1e63f013b9ebed..5653ccfd0816db6a72f89822e93eb862381dea50 100644 (file)
@@ -1,8 +1,11 @@
+import time
+import unittest
 from unittest import mock
 
 import pytest
 
 import smb.fs
+from smb.fs import _TTLCache
 
 
 def test_mocked_fs_authorizer():
@@ -65,3 +68,108 @@ def test_mocked_fs_path_resolver(monkeypatch):
     )
     with pytest.raises(FileNotFoundError):
         fspr.resolve_exists('cephfs', 'alpha', 'beta', '/zowie')
+
+
+class TestTTLCache(unittest.TestCase):
+    def setUp(self):
+        self.cache = _TTLCache(
+            ttl=1, maxsize=3
+        )  # Short TTL and small size for testing
+
+    def test_cache_set_and_get(self):
+        self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+        self.assertEqual(
+            self.cache.get(('key1', 'key2', 'key3')),
+            ('value1', 'val', 'test'),
+        )
+
+    def test_cache_expiry(self):
+        self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+        time.sleep(1.5)  # Wait for the TTL to expire
+        self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+    def test_cache_eviction(self):
+        # Fill the cache to maxsize
+        self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+        self.cache.set(('key4', 'key5', 'key6'), ('value2', 'val', 'test'))
+        self.cache.set(('key7', 'key8', 'key9'), ('value3', 'val', 'test'))
+
+        # Add another entry to trigger eviction of the oldest
+        self.cache.set(('key10', 'key11', 'key12'), ('value4', 'val', 'test'))
+
+        # Ensure oldest entry is evicted
+        self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+        # Ensure other entries are present
+        self.assertEqual(
+            self.cache.get(('key4', 'key5', 'key6')),
+            ('value2', 'val', 'test'),
+        )
+        self.assertEqual(
+            self.cache.get(('key7', 'key8', 'key9')),
+            ('value3', 'val', 'test'),
+        )
+        self.assertEqual(
+            self.cache.get(('key10', 'key11', 'key12')),
+            ('value4', 'val', 'test'),
+        )
+
+    def test_cache_clear(self):
+        self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+        self.cache.clear()
+        self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+
+def test_caching_fs_path_resolver(monkeypatch):
+    monkeypatch.setattr('cephfs.ObjectNotFound', KeyError)
+
+    def mmcmd(cmd):
+        if cmd['prefix'] == 'fs subvolume getpath':
+            if (
+                cmd['vol_name'] == 'cached_cephfs'
+                and cmd['sub_name'] == 'cached_beta'
+            ):
+                return 0, '/volumes/cool/path/f00d-600d', ''
+        return -5, '', 'cached_eek'
+
+    m = mock.MagicMock()
+    m.mon_command.side_effect = mmcmd
+
+    fspr = smb.fs.CachingCephFSPathResolver(m, client=m)
+
+    # Resolve a path (cache miss)
+    path = fspr.resolve(
+        'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+    )
+    assert path == '/volumes/cool/path/f00d-600d/zowie'
+    assert len(fspr._cache) == 1
+    assert m.mon_command.call_count == 1
+
+    # Resolve the same path again (cache hit)
+    path = fspr.resolve(
+        'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+    )
+    assert path == '/volumes/cool/path/f00d-600d/zowie'
+
+    # Ensure cache size remains the same
+    assert len(fspr._cache) == 1
+    assert m.mon_command.call_count == 1
+
+    path = fspr.resolve('cached_cephfs', '', '', '/zowie')
+    assert path == '/zowie'
+
+    # If subvolume is empty cache size should remain the same
+    assert len(fspr._cache) == 1
+    assert m.mon_command.call_count == 1
+
+    # Clear cache and validate
+    fspr._cache.clear()
+    assert len(fspr._cache) == 0
+
+    # Re-resolve to repopulate cache
+    path = fspr.resolve(
+        'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+    )
+    assert path == '/volumes/cool/path/f00d-600d/zowie'
+    assert len(fspr._cache) == 1
+    assert m.mon_command.call_count == 2