+import cephfs
import contextlib
import datetime
+import errno
import os
import socket
-import logging
import time
+import logging
from functools import wraps
+import sys
+from threading import Lock, Condition, Event
+from typing import no_type_check
+if sys.version_info >= (3, 3):
+ from threading import Timer
+else:
+ from threading import _Timer as Timer
try:
from typing import Tuple, Any, Callable
logger = logging.getLogger(__name__)
+class CephfsConnectionException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def to_tuple(self):
+ return self.errno, "", self.error_str
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
+
+
+class ConnectionPool(object):
+ class Connection(object):
+ def __init__(self, mgr, fs_name):
+ self.fs = None
+ self.mgr = mgr
+ self.fs_name = fs_name
+ self.ops_in_progress = 0
+ self.last_used = time.time()
+ self.fs_id = self.get_fs_id()
+
+ def get_fs_id(self):
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == self.fs_name:
+ return fs['id']
+ raise CephfsConnectionException(
+ -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
+
+ def get_fs_handle(self):
+ self.last_used = time.time()
+ self.ops_in_progress += 1
+ return self.fs
+
+ def put_fs_handle(self, notify):
+ assert self.ops_in_progress > 0
+ self.ops_in_progress -= 1
+ if self.ops_in_progress == 0:
+ notify()
+
+ def del_fs_handle(self, waiter):
+ if waiter:
+ while self.ops_in_progress != 0:
+ waiter()
+ if self.is_connection_valid():
+ self.disconnect()
+ else:
+ self.abort()
+
+ def is_connection_valid(self):
+ fs_id = None
+ try:
+ fs_id = self.get_fs_id()
+ except:
+ # the filesystem does not exist now -- connection is not valid.
+ pass
+ logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
+ return self.fs_id == fs_id
+
+ def is_connection_idle(self, timeout):
+ return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+ def connect(self):
+ assert self.ops_in_progress == 0
+ logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+ self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+ logger.debug("Setting user ID and group ID of CephFS mount as root...")
+ self.fs.conf_set("client_mount_uid", "0")
+ self.fs.conf_set("client_mount_gid", "0")
+ logger.debug("CephFS initializing...")
+ self.fs.init()
+ logger.debug("CephFS mounting...")
+ self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+ logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+ self.mgr._ceph_register_client(self.fs.get_addrs())
+
+ def disconnect(self):
+ try:
+ assert self.fs
+ assert self.ops_in_progress == 0
+ logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ addrs = self.fs.get_addrs()
+ self.fs.shutdown()
+ self.mgr._ceph_unregister_client(addrs)
+ self.fs = None
+ except Exception as e:
+ logger.debug("disconnect: ({0})".format(e))
+ raise
+
+ def abort(self):
+ assert self.fs
+ assert self.ops_in_progress == 0
+ logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+ self.fs.abort_conn()
+ logger.info("abort done from cephfs '{0}'".format(self.fs_name))
+ self.fs = None
+
+ class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ @no_type_check
+ def run(self):
+ try:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+ except Exception as e:
+ logger.error("ConnectionPool.RTimer: %s", e)
+ raise
+
+ # TODO: make this configurable
+ TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
+ CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.connections = {}
+ self.lock = Lock()
+ self.cond = Condition(self.lock)
+ self.timer_task = ConnectionPool.RTimer(
+ ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
+ self.timer_task.start()
+
+ def cleanup_connections(self):
+ with self.lock:
+ logger.info("scanning for idle connections..")
+ idle_fs = [fs_name for fs_name, conn in self.connections.items()
+ if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+ for fs_name in idle_fs:
+ logger.info("cleaning up connection for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name)
+
+ def get_fs_handle(self, fs_name):
+ with self.lock:
+ conn = None
+ try:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ if conn.is_connection_valid():
+ return conn.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+ self._del_fs_handle(fs_name)
+ conn = ConnectionPool.Connection(self.mgr, fs_name)
+ conn.connect()
+ except cephfs.Error as e:
+ # try to provide a better error string if possible
+ if e.args[0] == errno.ENOENT:
+ raise CephfsConnectionException(
+ -errno.ENOENT, "FS '{0}' not found".format(fs_name))
+ raise CephfsConnectionException(-e.args[0], e.args[1])
+ self.connections[fs_name] = conn
+ return conn.get_fs_handle()
+
+ def put_fs_handle(self, fs_name):
+ with self.lock:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+
+ def _del_fs_handle(self, fs_name, wait=False):
+ conn = self.connections.pop(fs_name, None)
+ if conn:
+ conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+ def del_fs_handle(self, fs_name, wait=False):
+ with self.lock:
+ self._del_fs_handle(fs_name, wait)
+
+ def del_all_handles(self):
+ with self.lock:
+ for fs_name in list(self.connections.keys()):
+ logger.info("waiting for pending ops for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name, wait=True)
+ logger.info("pending ops completed for '{}'".format(fs_name))
+ # no new connections should have been initialized since its
+ # guarded on shutdown.
+ assert len(self.connections) == 0
+
+
+class CephfsClient(object):
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.stopping = Event()
+ self.connection_pool = ConnectionPool(self.mgr)
+
+ def shutdown(self):
+ logger.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # second, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_handles()
+
+
+@contextlib.contextmanager
+def open_filesystem(fsc, fs_name):
+ """
+ Open a volume with shared access.
+ This API is to be used as a context manager.
+
+ :param fsc: cephfs client instance
+ :param fs_name: fs name
+ :return: yields a fs handle (ceph filesystem handle)
+ """
+ if fsc.is_stopping():
+ raise CephfsConnectionException(-errno.ESHUTDOWN,
+ "shutdown in progress")
+
+ fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
+ try:
+ yield fs_handle
+ finally:
+ fsc.connection_pool.put_fs_handle(fs_name)
+
+
def colorize(msg, color, dark=False):
"""
Decorate `msg` with escape sequences to give the requested color
LD_LIBRARY_PATH = ../../../build/lib
deps =
cython
- -rrequirements.txt
+ -r requirements.txt
commands =
pytest --cov --cov-append --cov-report= --doctest-modules {posargs: \
mgr_util.py \
basepython = python3
deps =
cython
- -rrequirements.txt
+ -r requirements.txt
mypy==0.770
commands =
mypy --config-file=../../mypy.ini \
-import time
import errno
import logging
import sys
from typing import List
from contextlib import contextmanager
-from threading import Lock, Condition
-from typing import no_type_check
-if sys.version_info >= (3, 3):
- from threading import Timer
-else:
- from threading import _Timer as Timer
-
-import cephfs
import orchestrator
from .lock import GlobalLock
from ..exception import VolumeException
from ..fs_util import create_pool, remove_pool, create_filesystem, \
remove_filesystem, create_mds, volume_exists
+from mgr_util import open_filesystem
log = logging.getLogger(__name__)
-class ConnectionPool(object):
- class Connection(object):
- def __init__(self, mgr, fs_name):
- self.fs = None
- self.mgr = mgr
- self.fs_name = fs_name
- self.ops_in_progress = 0
- self.last_used = time.time()
- self.fs_id = self.get_fs_id()
-
- def get_fs_id(self):
- fs_map = self.mgr.get('fs_map')
- for fs in fs_map['filesystems']:
- if fs['mdsmap']['fs_name'] == self.fs_name:
- return fs['id']
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
-
- def get_fs_handle(self):
- self.last_used = time.time()
- self.ops_in_progress += 1
- return self.fs
-
- def put_fs_handle(self, notify):
- assert self.ops_in_progress > 0
- self.ops_in_progress -= 1
- if self.ops_in_progress == 0:
- notify()
-
- def del_fs_handle(self, waiter):
- if waiter:
- while self.ops_in_progress != 0:
- waiter()
- if self.is_connection_valid():
- self.disconnect()
- else:
- self.abort()
-
- def is_connection_valid(self):
- fs_id = None
- try:
- fs_id = self.get_fs_id()
- except:
- # the filesystem does not exist now -- connection is not valid.
- pass
- log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
- return self.fs_id == fs_id
-
- def is_connection_idle(self, timeout):
- return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
-
- def connect(self):
- assert self.ops_in_progress == 0
- log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
- self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
- log.debug("Setting user ID and group ID of CephFS mount as root...")
- self.fs.conf_set("client_mount_uid", "0")
- self.fs.conf_set("client_mount_gid", "0")
- log.debug("CephFS initializing...")
- self.fs.init()
- log.debug("CephFS mounting...")
- self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
- log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
- self.mgr._ceph_register_client(self.fs.get_addrs())
-
- def disconnect(self):
- try:
- assert self.fs
- assert self.ops_in_progress == 0
- log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
- addrs = self.fs.get_addrs()
- self.fs.shutdown()
- self.mgr._ceph_unregister_client(addrs)
- self.fs = None
- except Exception as e:
- log.debug("disconnect: ({0})".format(e))
- raise
-
- def abort(self):
- assert self.fs
- assert self.ops_in_progress == 0
- log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
- self.fs.abort_conn()
- log.info("abort done from cephfs '{0}'".format(self.fs_name))
- self.fs = None
-
- class RTimer(Timer):
- """
- recurring timer variant of Timer
- """
- @no_type_check
- def run(self):
- try:
- while not self.finished.is_set():
- self.finished.wait(self.interval)
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- except Exception as e:
- log.error("ConnectionPool.RTimer: %s", e)
- raise
-
- # TODO: make this configurable
- TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
- CONNECTION_IDLE_INTERVAL = 60.0 # seconds
-
- def __init__(self, mgr):
- self.mgr = mgr
- self.connections = {}
- self.lock = Lock()
- self.cond = Condition(self.lock)
- self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
- self.cleanup_connections)
- self.timer_task.start()
-
- def cleanup_connections(self):
- with self.lock:
- log.info("scanning for idle connections..")
- idle_fs = [fs_name for fs_name,conn in self.connections.items()
- if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
- for fs_name in idle_fs:
- log.info("cleaning up connection for '{}'".format(fs_name))
- self._del_fs_handle(fs_name)
-
- def get_fs_handle(self, fs_name):
- with self.lock:
- conn = None
- try:
- conn = self.connections.get(fs_name, None)
- if conn:
- if conn.is_connection_valid():
- return conn.get_fs_handle()
- else:
- # filesystem id changed beneath us (or the filesystem does not exist).
- # this is possible if the filesystem got removed (and recreated with
- # same name) via "ceph fs rm/new" mon command.
- log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
- self._del_fs_handle(fs_name)
- conn = ConnectionPool.Connection(self.mgr, fs_name)
- conn.connect()
- except cephfs.Error as e:
- # try to provide a better error string if possible
- if e.args[0] == errno.ENOENT:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
- raise VolumeException(-e.args[0], e.args[1])
- self.connections[fs_name] = conn
- return conn.get_fs_handle()
-
- def put_fs_handle(self, fs_name):
- with self.lock:
- conn = self.connections.get(fs_name, None)
- if conn:
- conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
-
- def _del_fs_handle(self, fs_name, wait=False):
- conn = self.connections.pop(fs_name, None)
- if conn:
- conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
-
- def del_fs_handle(self, fs_name, wait=False):
- with self.lock:
- self._del_fs_handle(fs_name, wait)
-
- def del_all_handles(self):
- with self.lock:
- for fs_name in list(self.connections.keys()):
- log.info("waiting for pending ops for '{}'".format(fs_name))
- self._del_fs_handle(fs_name, wait=True)
- log.info("pending ops completed for '{}'".format(fs_name))
- # no new connections should have been initialized since its
- # guarded on shutdown.
- assert len(self.connections) == 0
def gen_pool_names(volname):
"""
# create mds
return create_mds(mgr, volname, placement)
+
def delete_volume(mgr, volname, metadata_pool, data_pools):
"""
delete the given module (tear down mds, remove filesystem, remove pools)
result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
return r, result_str, ""
+
def list_volumes(mgr):
"""
list all filesystem volumes.
result.append({'name': f['mdsmap']['fs_name']})
return result
+
@contextmanager
def open_volume(vc, volname):
"""
- open a volume for exclusive access. This API is to be used as a context manager.
+ open a volume for exclusive access. This API is to be used as a contextr
+ manager.
:param vc: volume client instance
:param volname: volume name
:return: yields a volume handle (ceph filesystem handle)
"""
- if vc.is_stopping():
- raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
g_lock = GlobalLock()
- fs_handle = vc.connection_pool.get_fs_handle(volname)
- try:
- with g_lock.lock_op():
+ with g_lock.lock_op():
+ with open_filesystem(vc, volname) as fs_handle:
yield fs_handle
- finally:
- vc.connection_pool.put_fs_handle(volname)
+
@contextmanager
def open_volume_lockless(vc, volname):
"""
- open a volume with shared access. This API is to be used as a context manager.
+ open a volume with shared access. This API is to be used as a context
+ manager.
:param vc: volume client instance
:param volname: volume name
:return: yields a volume handle (ceph filesystem handle)
"""
- if vc.is_stopping():
- raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
- fs_handle = vc.connection_pool.get_fs_handle(volname)
- try:
+ with open_filesystem(vc, volname) as fs_handle:
yield fs_handle
- finally:
- vc.connection_pool.put_fs_handle(volname)
import json
import errno
import logging
-from threading import Event
-import cephfs
+from mgr_util import CephfsClient
from .fs_util import listdir
-from .operations.volume import ConnectionPool, open_volume, create_volume, \
- delete_volume, list_volumes, get_pool_names
+from .operations.volume import create_volume, \
+ delete_volume, list_volumes, open_volume, get_pool_names
from .operations.group import open_group, create_group, remove_group, open_group_unique
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
except ValueError:
raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
def name_to_json(names):
"""
convert the list of names to json
namedict.append({'name': names[i].decode('utf-8')})
return json.dumps(namedict, indent=4, sort_keys=True)
-class VolumeClient(object):
+
+class VolumeClient(CephfsClient):
def __init__(self, mgr):
- self.mgr = mgr
- self.stopping = Event()
+ super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- self.connection_pool = ConnectionPool(self.mgr)
+ # TODO: make thread pool size configurable
self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
return self.stopping.is_set()
def shutdown(self):
+ # Overrides CephfsClient.shutdown()
log.info("shutting down")
# first, note that we're shutting down
self.stopping.set()