]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: add support for rados locks to rados store
authorJohn Mulligan <jmulligan@redhat.com>
Mon, 15 Jul 2024 19:22:22 +0000 (15:22 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Tue, 20 Aug 2024 13:53:56 +0000 (09:53 -0400)
Add support for using rados object locks to the rados store classes.
Callers directly using the rados store outside the store interface will
be able to make use of locking.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/rados_store.py

index 41179bc131310ee3710c178457613a52fcd12d59..1e739088e7eac18dfd7cdca3ce85f2572f65a535 100644 (file)
@@ -1,8 +1,18 @@
-from typing import TYPE_CHECKING, Callable, Collection, Iterator, Optional
+from typing import (
+    TYPE_CHECKING,
+    Callable,
+    Collection,
+    Iterator,
+    Optional,
+    Tuple,
+)
 
+import contextlib
 import functools
 import json
 import logging
+import time
+import uuid
 
 import rados
 
@@ -27,6 +37,7 @@ class RADOSConfigEntry:
         self._pool = pool
         self._ns = ns
         self._key = key
+        self._ioctx = None
 
     @property
     def uri(self) -> str:
@@ -44,7 +55,7 @@ class RADOSConfigEntry:
     def read(self) -> str:
         """Read a RAODS object."""
         log.debug('rados read of %s', self.full_key)
-        with self._rados.open_ioctx(self._pool) as ioctx:
+        with self._shared_ioctx() as ioctx:
             ioctx.set_namespace(self._ns)
             try:
                 val = ioctx.read(self._key, _CHUNK_SIZE).decode()
@@ -58,7 +69,7 @@ class RADOSConfigEntry:
         log.debug('rados write to %s', self.full_key)
         data = content.encode('utf-8')
         assert len(data) < _CHUNK_SIZE
-        with self._rados.open_ioctx(self._pool) as ioctx:
+        with self._shared_ioctx() as ioctx:
             ioctx.set_namespace(self._ns)
             ioctx.write_full(self._key, data)
 
@@ -66,7 +77,12 @@ class RADOSConfigEntry:
         """Get the deserialized store entry value."""
         if not self.exists():
             raise KeyError(self.full_key)
-        return json.loads(self.read())
+        data = self.read()
+        if not data:
+            # empty data is equivalent to object not existing.
+            # this may occur if a lock is taken.
+            raise KeyError(self.full_key)
+        return json.loads(data)
 
     def set(self, obj: Simplified) -> None:
         """Set the store entry value to that of the serialized value of obj."""
@@ -75,7 +91,7 @@ class RADOSConfigEntry:
     def remove(self) -> bool:
         """Remove the current entry from the store."""
         log.debug('rados remove of %s', self.full_key)
-        with self._rados.open_ioctx(self._pool) as ioctx:
+        with self._shared_ioctx() as ioctx:
             ioctx.set_namespace(self._ns)
             try:
                 ioctx.remove_object(self._key)
@@ -89,7 +105,7 @@ class RADOSConfigEntry:
         """Returns true if the entry currently exists within the store."""
         log.debug('rados exists of %s', self.full_key)
         try:
-            with self._rados.open_ioctx(self._pool) as ioctx:
+            with self._shared_ioctx() as ioctx:
                 ioctx.set_namespace(self._ns)
                 ioctx.stat(self._key)
             found = True
@@ -98,6 +114,62 @@ class RADOSConfigEntry:
         log.debug('rados exists result of %s = %r', self.full_key, found)
         return found
 
+    @contextlib.contextmanager
+    def locked(self, name: str) -> Iterator[None]:
+        """Place a rados lock on the object for the duration of the context
+        manager. Requires a lock name.
+        """
+        with self._shared_ioctx() as ioctx:
+            ioctx.set_namespace(self._ns)
+            cookie = self._acquire_lock(ioctx, name)
+            try:
+                yield None
+            finally:
+                self._release_lock(ioctx, name, cookie)
+
+    @contextlib.contextmanager
+    def _shared_ioctx(self) -> Iterator[rados.Ioctx]:
+        """Helper for returning a ioctx for nested operations."""
+        if self._ioctx is not None:
+            yield self._ioctx
+            return
+        with self._rados.open_ioctx(self._pool) as ioctx:
+            self._ioctx = ioctx
+            try:
+                yield ioctx
+            finally:
+                self._ioctx = None
+
+    def _acquire_lock(
+        self,
+        ioctx: rados.Ioctx,
+        name: str,
+        desc: str = 'rados_store',
+        *,
+        wait_sec: float = 0.25,
+        max_wait: int = 30,
+    ) -> str:
+        """Acquire a rados lock."""
+        cookie = f'mgr:smb:{uuid.uuid4()}'
+        for _ in range(int(max_wait / wait_sec)):
+            try:
+                ioctx.lock_exclusive(
+                    self._key, name, cookie, desc=desc, duration=None
+                )
+                return cookie
+            except rados.ObjectBusy as err:
+                log.debug("object busy: %r, %r, %r", self._key, name, cookie)
+                time.sleep(wait_sec)
+                last_err = err
+        log.warning('failed to acquire lock in %ssec: %r', max_wait, last_err)
+        raise last_err
+
+    def _release_lock(
+        self, ioctx: rados.Ioctx, name: str, cookie: str
+    ) -> None:
+        """Release a rados lock."""
+        ioctx.unlock(self._key, name, cookie)
+
 
 class RADOSConfigStore:
     """A config store that saves entries in a RADOS pool.