-from typing import List, Optional
+from typing import Dict, List, Optional, Tuple
import logging
import posixpath
import stat
+import time
import cephfs
from mgr_util import CephfsClient, Module_T, open_filesystem
self._mgr = mgr
self._cephfs_client = client or CephfsClient(mgr)
- def resolve(
- self, volume: str, subvolumegroup: str, subvolume: str, path: str
+ def resolve_subvolume_path(
+ self, volume: str, subvolumegroup: str, subvolume: str
) -> str:
- """Given a volume, subvolumegroup, subvolume, and path, return the real
- path within the file system. subvolumegroup and subvolume may be empty
- strings when no subvolume is being used.
+ """Given a volume, subvolumegroup, and subvolume, return the real path
+ within the file system. subvolumegroup and subvolume may be empty strings
+ when no subvolume is being used.
"""
- path = path.lstrip('/')
- if not (subvolumegroup or subvolume):
- return f'/{path}'
cmd = {
'prefix': 'fs subvolume getpath',
'vol_name': volume,
ret, data, status = self._mgr.mon_command(cmd)
if ret != 0:
raise CephFSSubvolumeResolutionError(status)
- log.debug('Mapped subvolume to path: %r', data)
- return posixpath.join(data.strip(), path)
+ log.info('Mapped subvolume to path: %r', data)
+ return data.strip()
+
+ def resolve(
+ self, volume: str, subvolumegroup: str, subvolume: str, path: str
+ ) -> str:
+ """Given a volume, subvolumegroup, subvolume, and path, return the real
+ path within the file system. subvolumegroup and subvolume may be empty
+ strings when no subvolume is being used.
+ """
+ path = path.lstrip('/')
+ if not (subvolumegroup or subvolume):
+ return f'/{path}'
+ subvolume_path = self.resolve_subvolume_path(
+ volume, subvolumegroup, subvolume
+ )
+ return posixpath.join(subvolume_path, path)
def resolve_exists(
self, volume: str, subvolumegroup: str, subvolume: str, path: str
raise NotADirectoryError(volpath)
log.debug('Verified that %r exists in %r', volpath, volume)
return volpath
+
+
+class _TTLCache:
+ def __init__(self, maxsize: int = 512, ttl: float = 300.0) -> None:
+ self.cache: Dict[Tuple[str, str, str], Tuple[str, float]] = {}
+ self.maxsize: int = maxsize
+ self.ttl: float = ttl
+
+ def _evict(self) -> None:
+ """Evicts items that have expired or if cache size exceeds maxsize."""
+ current_time: float = time.monotonic()
+ keys_to_evict: list[Tuple[str, str, str]] = [
+ key
+ for key, (_, timestamp) in self.cache.items()
+ if current_time - timestamp > self.ttl
+ ]
+ for key in keys_to_evict:
+ del self.cache[key]
+
+ # Further evict if cache size exceeds maxsize
+ if len(self.cache) > self.maxsize:
+ for key in list(self.cache.keys())[
+ : len(self.cache) - self.maxsize
+ ]:
+ del self.cache[key]
+
+ def get(self, key: Tuple[str, str, str]) -> Optional[str]:
+ """Retrieve item from cache if it exists and is not expired."""
+ self._evict() # Ensure expired items are removed
+ if key in self.cache:
+ value, _ = self.cache[key]
+ return value
+ return None
+
+ def set(self, key: Tuple[str, str, str], value: str) -> None:
+ """Set item in cache, evicting expired or excess items."""
+ self._evict() # Ensure expired items are removed
+ self.cache[key] = (value, time.monotonic())
+
+ def clear(self) -> None:
+ """Clear all items in the cache."""
+ self.cache.clear()
+
+ def __len__(self) -> int:
+ """Return the number of items currently in the cache."""
+ return len(self.cache)
+
+
+class CachingCephFSPathResolver(CephFSPathResolver):
+ """
+ A subclass of CephFSPathResolver that adds caching to the resolve method
+ to improve performance by reducing redundant path resolutions.
+
+ This implementation uses a TTL (Time-To-Live) cache rather than an LRU (Least
+ Recently Used) cache. The TTL cache is preferred in this scenario because
+ the validity of cached paths is time-sensitive, and we want to ensure that
+ paths are refreshed after a certain period regardless of access frequency.
+ Rlock can be used to synchronize access to the cache, but that is something
+ not required for now & can be later tested.
+ """
+
+ def __init__(
+ self, mgr: Module_T, *, client: Optional[CephfsClient] = None
+ ) -> None:
+ super().__init__(mgr, client=client)
+ # Initialize a TTL cache.
+ self._cache = _TTLCache(maxsize=512, ttl=5)
+
+ def resolve_subvolume_path(
+ self, volume: str, subvolumegroup: str, subvolume: str
+ ) -> str:
+ cache_key = (volume, subvolumegroup, subvolume)
+ cached_path = self._cache.get(cache_key)
+ if cached_path:
+ log.debug("Cache hit for key: %r", cache_key)
+ return cached_path
+
+ log.debug("Cache miss for key: %r", cache_key)
+ resolved_path = super().resolve_subvolume_path(
+ volume, subvolumegroup, subvolume
+ )
+ self._cache.set(cache_key, resolved_path)
+ return resolved_path
+import time
+import unittest
from unittest import mock
import pytest
import smb.fs
+from smb.fs import _TTLCache
def test_mocked_fs_authorizer():
)
with pytest.raises(FileNotFoundError):
fspr.resolve_exists('cephfs', 'alpha', 'beta', '/zowie')
+
+
+class TestTTLCache(unittest.TestCase):
+ def setUp(self):
+ self.cache = _TTLCache(
+ ttl=1, maxsize=3
+ ) # Short TTL and small size for testing
+
+ def test_cache_set_and_get(self):
+ self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+ self.assertEqual(
+ self.cache.get(('key1', 'key2', 'key3')),
+ ('value1', 'val', 'test'),
+ )
+
+ def test_cache_expiry(self):
+ self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+ time.sleep(1.5) # Wait for the TTL to expire
+ self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+ def test_cache_eviction(self):
+ # Fill the cache to maxsize
+ self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+ self.cache.set(('key4', 'key5', 'key6'), ('value2', 'val', 'test'))
+ self.cache.set(('key7', 'key8', 'key9'), ('value3', 'val', 'test'))
+
+ # Add another entry to trigger eviction of the oldest
+ self.cache.set(('key10', 'key11', 'key12'), ('value4', 'val', 'test'))
+
+ # Ensure oldest entry is evicted
+ self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+ # Ensure other entries are present
+ self.assertEqual(
+ self.cache.get(('key4', 'key5', 'key6')),
+ ('value2', 'val', 'test'),
+ )
+ self.assertEqual(
+ self.cache.get(('key7', 'key8', 'key9')),
+ ('value3', 'val', 'test'),
+ )
+ self.assertEqual(
+ self.cache.get(('key10', 'key11', 'key12')),
+ ('value4', 'val', 'test'),
+ )
+
+ def test_cache_clear(self):
+ self.cache.set(('key1', 'key2', 'key3'), ('value1', 'val', 'test'))
+ self.cache.clear()
+ self.assertIsNone(self.cache.get(('key1', 'key2', 'key3')))
+
+
+def test_caching_fs_path_resolver(monkeypatch):
+ monkeypatch.setattr('cephfs.ObjectNotFound', KeyError)
+
+ def mmcmd(cmd):
+ if cmd['prefix'] == 'fs subvolume getpath':
+ if (
+ cmd['vol_name'] == 'cached_cephfs'
+ and cmd['sub_name'] == 'cached_beta'
+ ):
+ return 0, '/volumes/cool/path/f00d-600d', ''
+ return -5, '', 'cached_eek'
+
+ m = mock.MagicMock()
+ m.mon_command.side_effect = mmcmd
+
+ fspr = smb.fs.CachingCephFSPathResolver(m, client=m)
+
+ # Resolve a path (cache miss)
+ path = fspr.resolve(
+ 'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+ )
+ assert path == '/volumes/cool/path/f00d-600d/zowie'
+ assert len(fspr._cache) == 1
+ assert m.mon_command.call_count == 1
+
+ # Resolve the same path again (cache hit)
+ path = fspr.resolve(
+ 'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+ )
+ assert path == '/volumes/cool/path/f00d-600d/zowie'
+
+ # Ensure cache size remains the same
+ assert len(fspr._cache) == 1
+ assert m.mon_command.call_count == 1
+
+ path = fspr.resolve('cached_cephfs', '', '', '/zowie')
+ assert path == '/zowie'
+
+ # If subvolume is empty cache size should remain the same
+ assert len(fspr._cache) == 1
+ assert m.mon_command.call_count == 1
+
+ # Clear cache and validate
+ fspr._cache.clear()
+ assert len(fspr._cache) == 0
+
+ # Re-resolve to repopulate cache
+ path = fspr.resolve(
+ 'cached_cephfs', 'cached_alpha', 'cached_beta', '/zowie'
+ )
+ assert path == '/volumes/cool/path/f00d-600d/zowie'
+ assert len(fspr._cache) == 1
+ assert m.mon_command.call_count == 2