-from typing import TYPE_CHECKING, Callable, Collection, Iterator, Optional
+from typing import (
+ TYPE_CHECKING,
+ Callable,
+ Collection,
+ Iterator,
+ Optional,
+ Tuple,
+)
+import contextlib
import functools
import json
import logging
+import time
+import uuid
import rados
self._pool = pool
self._ns = ns
self._key = key
+ self._ioctx = None
@property
def uri(self) -> str:
def read(self) -> str:
"""Read a RAODS object."""
log.debug('rados read of %s', self.full_key)
- with self._rados.open_ioctx(self._pool) as ioctx:
+ with self._shared_ioctx() as ioctx:
ioctx.set_namespace(self._ns)
try:
val = ioctx.read(self._key, _CHUNK_SIZE).decode()
log.debug('rados write to %s', self.full_key)
data = content.encode('utf-8')
assert len(data) < _CHUNK_SIZE
- with self._rados.open_ioctx(self._pool) as ioctx:
+ with self._shared_ioctx() as ioctx:
ioctx.set_namespace(self._ns)
ioctx.write_full(self._key, data)
"""Get the deserialized store entry value."""
if not self.exists():
raise KeyError(self.full_key)
- return json.loads(self.read())
+ data = self.read()
+ if not data:
+ # empty data is equivalent to object not existing.
+ # this may occur if a lock is taken.
+ raise KeyError(self.full_key)
+ return json.loads(data)
def set(self, obj: Simplified) -> None:
"""Set the store entry value to that of the serialized value of obj."""
def remove(self) -> bool:
"""Remove the current entry from the store."""
log.debug('rados remove of %s', self.full_key)
- with self._rados.open_ioctx(self._pool) as ioctx:
+ with self._shared_ioctx() as ioctx:
ioctx.set_namespace(self._ns)
try:
ioctx.remove_object(self._key)
"""Returns true if the entry currently exists within the store."""
log.debug('rados exists of %s', self.full_key)
try:
- with self._rados.open_ioctx(self._pool) as ioctx:
+ with self._shared_ioctx() as ioctx:
ioctx.set_namespace(self._ns)
ioctx.stat(self._key)
found = True
log.debug('rados exists result of %s = %r', self.full_key, found)
return found
+ @contextlib.contextmanager
+ def locked(self, name: str) -> Iterator[None]:
+ """Place a rados lock on the object for the duration of the context
+ manager. Requires a lock name.
+ """
+ with self._shared_ioctx() as ioctx:
+ ioctx.set_namespace(self._ns)
+ cookie = self._acquire_lock(ioctx, name)
+ try:
+ yield None
+ finally:
+ self._release_lock(ioctx, name, cookie)
+
+ @contextlib.contextmanager
+ def _shared_ioctx(self) -> Iterator[rados.Ioctx]:
+ """Helper for returning a ioctx for nested operations."""
+ if self._ioctx is not None:
+ yield self._ioctx
+ return
+ with self._rados.open_ioctx(self._pool) as ioctx:
+ self._ioctx = ioctx
+ try:
+ yield ioctx
+ finally:
+ self._ioctx = None
+
+ def _acquire_lock(
+ self,
+ ioctx: rados.Ioctx,
+ name: str,
+ desc: str = 'rados_store',
+ *,
+ wait_sec: float = 0.25,
+ max_wait: int = 30,
+ ) -> str:
+ """Acquire a rados lock."""
+ cookie = f'mgr:smb:{uuid.uuid4()}'
+ for _ in range(int(max_wait / wait_sec)):
+ try:
+ ioctx.lock_exclusive(
+ self._key, name, cookie, desc=desc, duration=None
+ )
+ return cookie
+ except rados.ObjectBusy as err:
+ log.debug("object busy: %r, %r, %r", self._key, name, cookie)
+ time.sleep(wait_sec)
+ last_err = err
+ log.warning('failed to acquire lock in %ssec: %r', max_wait, last_err)
+ raise last_err
+
+ def _release_lock(
+ self, ioctx: rados.Ioctx, name: str, cookie: str
+ ) -> None:
+ """Release a rados lock."""
+ ioctx.unlock(self._key, name, cookie)
+
class RADOSConfigStore:
"""A config store that saves entries in a RADOS pool.