]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: tie everything together to implement versioned subvolumes
authorVenky Shankar <vshankar@redhat.com>
Wed, 20 Nov 2019 14:11:17 +0000 (09:11 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:11:59 +0000 (05:11 -0500)
apart from the new way of provisioning subvolumes, this makes heavy
use of context manager for volumes, groups and subvolumes.

this change classifies volumes, groups and subvolumes to be treated
as filesystem dentries and inodes. a "volume" can be thought as a
dentry with "groups" as it's entries (inodes). likewise, a "group"
is a dentry again with "subvolumes" as entries (inodes). this is
built into the access mechanism as follows:

      with open_volume(...) as fs_handle:
          with open_gorup(fs_handle, ...) as group:
              with open_subvolume(group, ...) as subvolume:
                  # call subvolume object API
                  path = subvolume.getpath()

this way, lot of redundant checks such as verifying if a volume or
group exist before accessing a subvolume is built right into the
access mechanism, plus, an added bonus of simple error handling.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 9b87bd72093eaecda5e2ffc3ed32e482431f9842)

src/pybind/mgr/volumes/fs/purge_queue.py
src/pybind/mgr/volumes/fs/subvolspec.py [deleted file]
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index d0814ca7817f3cb8ce4121e2309cc6756bb7e2fb..7c079250d6e21c46caa637d59b265ea894d91f8d 100644 (file)
@@ -1,10 +1,15 @@
 import sys
 import time
+import errno
 import logging
 import threading
 import traceback
 from collections import deque
 
+from .exception import VolumeException
+from .operations.volume import open_volume, open_volume_lockless
+from .operations.trash import open_trashcan
+
 log = logging.getLogger(__name__)
 
 class PurgeQueueBase(object):
@@ -138,20 +143,32 @@ class PurgeQueueBase(object):
         log.debug("fetching trash entry for volume '{0}'".format(volname))
 
         exclude_entries = [v[0] for v in self.jobs[volname]]
-        ret = self.vc.get_subvolume_trash_entry(
-            None, vol_name=volname, exclude_entries=exclude_entries)
-        if not ret[0] == 0:
-            log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0])
-            return ret[0], None
-        return 0, ret[1]
+        fs_handle = None
+        try:
+            with open_volume(self.vc, volname) as fs_handle:
+                with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
+                    path = trashcan.get_trash_entry(exclude_entries)
+                    ret = 0, path
+        except VolumeException as ve:
+            if ve.errno == -errno.ENOENT and fs_handle:
+                ret = 0, None
+            else:
+                log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
+                ret = ve.errno, None
+        return ret
 
     def purge_trash_entry_for_volume(self, volname, purge_dir):
         log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
 
+        ret = 0
         thread_id = threading.currentThread()
-        ret = self.vc.purge_subvolume_trash_entry(
-            None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
-        return ret[0]
+        try:
+            with open_volume_lockless(self.vc, volname) as fs_handle:
+                with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
+                    trashcan.purge(purge_dir, should_cancel=lambda: thread_id.should_cancel())
+        except VolumeException as ve:
+            ret = ve.errno
+        return ret
 
 class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
     """
@@ -223,7 +240,7 @@ class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
                 self.register_job(volname, purge_dir)
             ret = self.purge_trash_entry_for_volume(volname, purge_dir)
             if ret != 0:
-                log.warn("failed to purge {0}.{1}".format(volname, purge_dir))
+                log.warn("failed to purge {0}.{1} ({2})".format(volname, purge_dir, ret))
             with self.lock:
                 self.unregister_job(volname, purge_dir)
             time.sleep(1)
diff --git a/src/pybind/mgr/volumes/fs/subvolspec.py b/src/pybind/mgr/volumes/fs/subvolspec.py
deleted file mode 100644 (file)
index 6c7df55..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-import os
-import uuid
-
-class SubvolumeSpec(object):
-    """
-    Specification of a subvolume, identified by (subvolume-id, group-id) tuple. Add fields as
-    required...
-    """
-
-    # where shall we (by default) create subvolumes
-    DEFAULT_SUBVOL_PREFIX = "/volumes"
-    # and the default namespace
-    DEFAULT_NS_PREFIX = "fsvolumens_"
-
-    # Reserved subvolume group name which we use in paths for subvolumes
-    # that are not assigned to a group (i.e. created with group=None)
-    NO_GROUP_NAME = "_nogroup"
-
-    def __init__(self, subvolumeid, groupid, subvolume_prefix=None, pool_ns_prefix=None):
-        assert groupid != SubvolumeSpec.NO_GROUP_NAME
-
-        self.subvolumeid = subvolumeid
-        self.groupid = groupid if groupid is not None else SubvolumeSpec.NO_GROUP_NAME
-        self.subvolume_prefix = subvolume_prefix if subvolume_prefix else SubvolumeSpec.DEFAULT_SUBVOL_PREFIX
-        self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else SubvolumeSpec.DEFAULT_NS_PREFIX
-
-    def is_default_group(self):
-        """
-        Is the group the default group?
-        """
-        return self.groupid == SubvolumeSpec.NO_GROUP_NAME
-
-    @property
-    def subvolume_id(self):
-        """
-        Return the subvolume-id from the subvolume specification
-        """
-        return self.subvolumeid
-
-    @property
-    def group_id(self):
-        """
-        Return the group-id from the subvolume secification
-        """
-        return self.groupid
-
-    @property
-    def subvolume_path(self):
-        """
-        return the subvolume path from subvolume specification
-        """
-        return os.path.join(self.group_path, self.subvolumeid.encode('utf-8'))
-
-    @property
-    def group_path(self):
-        """
-        return the group path from subvolume specification
-        """
-        return os.path.join(self.subvolume_prefix.encode('utf-8'), self.groupid.encode('utf-8'))
-
-    @property
-    def trash_path(self):
-        """
-        return the trash path from subvolume specification
-        """
-        return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting", self.subvolumeid.encode('utf-8'))
-
-    @property
-    def unique_trash_path(self):
-        """
-        return a unique trash directory entry path
-        """
-        return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting", str(uuid.uuid4()).encode('utf-8'))
-
-    @property
-    def fs_namespace(self):
-        """
-        return a filesystem namespace by stashing pool namespace prefix and subvolume-id
-        """
-        return "{0}{1}".format(self.pool_ns_prefix, self.subvolumeid)
-
-    @property
-    def trash_dir(self):
-        """
-        return the trash directory path
-        """
-        return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting")
-
-    def make_subvol_snap_path(self, snapdir, snapname):
-        """
-        return the subvolume snapshot path for a given snapshot name
-        """
-        return os.path.join(self.subvolume_path, snapdir.encode('utf-8'), snapname.encode('utf-8'))
-
-    def make_subvol_snapdir_path(self, snapdir):
-        """
-        return the subvolume snapdir path
-        """
-        return os.path.join(self.subvolume_path, snapdir.encode('utf-8'))
-
-    def make_group_snap_path(self, snapdir, snapname):
-        """
-        return the group snapshot path for a given snapshot name
-        """
-        return os.path.join(self.group_path, snapdir.encode('utf-8'), snapname.encode('utf-8'))
-
-    def make_group_snapdir_path(self, snapdir):
-        """
-        return the group's snapdir path
-        """
-        return os.path.join(self.group_path, snapdir.encode('utf-8'))
-
-    def __str__(self):
-        return "{0}/{1}".format(self.groupid, self.subvolumeid)
index 9a31dd35313f8d5dab75d2ffa4f7d55cc31cb296..d287e8d55965f8b9b9dbc059caf38e27a3bb41bd 100644 (file)
 import json
-import time
 import errno
 import logging
-from threading import Lock, Condition, Event
-try:
-    # py2
-    from threading import _Timer as Timer
-except ImportError:
-    #py3
-    from threading import Timer
+from threading import Event
 
 import cephfs
-import orchestrator
 
-from .subvolspec import SubvolumeSpec
-from .subvolume import SubVolume
+from .fs_util import listdir
+
+from .operations.volume import ConnectionPool, open_volume, create_volume, \
+    delete_volume, list_volumes
+from .operations.group import open_group, create_group, remove_group
+from .operations.subvolume import open_subvol, create_subvol, remove_subvol
+
+from .vol_spec import VolSpec
 from .exception import VolumeException
 from .purge_queue import ThreadPoolPurgeQueueMixin
 
 log = logging.getLogger(__name__)
 
-class ConnectionPool(object):
-    class Connection(object):
-        def __init__(self, mgr, fs_name):
-            self.fs = None
-            self.mgr = mgr
-            self.fs_name = fs_name
-            self.ops_in_progress = 0
-            self.last_used = time.time()
-            self.fs_id = self.get_fs_id()
-
-        def get_fs_id(self):
-            fs_map = self.mgr.get('fs_map')
-            for fs in fs_map['filesystems']:
-                if fs['mdsmap']['fs_name'] == self.fs_name:
-                    return fs['id']
-            raise VolumeException(
-                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
-
-        def get_fs_handle(self):
-            self.last_used = time.time()
-            self.ops_in_progress += 1
-            return self.fs
-
-        def put_fs_handle(self, notify):
-            assert self.ops_in_progress > 0
-            self.ops_in_progress -= 1
-            if self.ops_in_progress == 0:
-                notify()
-
-        def del_fs_handle(self, waiter):
-            if waiter:
-                while self.ops_in_progress != 0:
-                    waiter()
-            if self.is_connection_valid():
-                self.disconnect()
-            else:
-                self.abort()
-
-        def is_connection_valid(self):
-            fs_id = None
-            try:
-                fs_id = self.get_fs_id()
-            except:
-                # the filesystem does not exist now -- connection is not valid.
-                pass
-            return self.fs_id == fs_id
-
-        def is_connection_idle(self, timeout):
-            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
-
-        def connect(self):
-            assert self.ops_in_progress == 0
-            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
-            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
-            log.debug("Setting user ID and group ID of CephFS mount as root...")
-            self.fs.conf_set("client_mount_uid", "0")
-            self.fs.conf_set("client_mount_gid", "0")
-            log.debug("CephFS initializing...")
-            self.fs.init()
-            log.debug("CephFS mounting...")
-            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
-            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
-
-        def disconnect(self):
-            assert self.ops_in_progress == 0
-            log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
-            self.fs.shutdown()
-            self.fs = None
-
-        def abort(self):
-            assert self.ops_in_progress == 0
-            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
-            self.fs.abort_conn()
-            self.fs = None
-
-    class RTimer(Timer):
-        """
-        recurring timer variant of Timer
-        """
-        def run(self):
-            try:
-                while not self.finished.is_set():
-                    self.finished.wait(self.interval)
-                    self.function(*self.args, **self.kwargs)
-                self.finished.set()
-            except Exception as e:
-                log.error("ConnectionPool.RTimer: %s", e)
-                raise
-
-    # TODO: make this configurable
-    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
-    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
-
-    def __init__(self, mgr):
-        self.mgr = mgr
-        self.connections = {}
-        self.lock = Lock()
-        self.cond = Condition(self.lock)
-        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
-                                                self.cleanup_connections)
-        self.timer_task.start()
-
-    def cleanup_connections(self):
-        with self.lock:
-            log.info("scanning for idle connections..")
-            idle_fs = [fs_name for fs_name,conn in self.connections.items()
-                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
-            for fs_name in idle_fs:
-                log.info("cleaning up connection for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name)
-
-    def get_fs_handle(self, fs_name):
-        with self.lock:
-            conn = None
-            try:
-                conn = self.connections.get(fs_name, None)
-                if conn:
-                    if conn.is_connection_valid():
-                        return conn.get_fs_handle()
-                    else:
-                        # filesystem id changed beneath us (or the filesystem does not exist).
-                        # this is possible if the filesystem got removed (and recreated with
-                        # same name) via "ceph fs rm/new" mon command.
-                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
-                        self._del_fs_handle(fs_name)
-                conn = ConnectionPool.Connection(self.mgr, fs_name)
-                conn.connect()
-            except cephfs.Error as e:
-                # try to provide a better error string if possible
-                if e.args[0] == errno.ENOENT:
-                    raise VolumeException(
-                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
-                raise VolumeException(-e.args[0], e.args[1])
-            self.connections[fs_name] = conn
-            return conn.get_fs_handle()
-
-    def put_fs_handle(self, fs_name):
-        with self.lock:
-            conn = self.connections.get(fs_name, None)
-            if conn:
-                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
-
-    def _del_fs_handle(self, fs_name, wait=False):
-        conn = self.connections.pop(fs_name, None)
-        if conn:
-            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
-
-    def del_fs_handle(self, fs_name, wait=False):
-        with self.lock:
-            self._del_fs_handle(fs_name, wait)
-
-    def del_all_handles(self):
-        with self.lock:
-            for fs_name in list(self.connections.keys()):
-                log.info("waiting for pending ops for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name, wait=True)
-                log.info("pending ops completed for '{}'".format(fs_name))
-            # no new connections should have been initialized since its
-            # guarded on shutdown.
-            assert len(self.connections) == 0
+def octal_str_to_decimal_int(mode):
+    try:
+        return int(mode, 8)
+    except ValueError:
+        raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
+def name_to_json(names):
+    """
+    convert the list of names to json
+    """
+    namedict = []
+    for i in range(len(names)):
+        namedict.append({'name': names[i].decode('utf-8')})
+    return json.dumps(namedict, indent=4, sort_keys=True)
 
 class VolumeClient(object):
     def __init__(self, mgr):
         self.mgr = mgr
         self.stopping = Event()
+        # volume specification
+        self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         self.connection_pool = ConnectionPool(self.mgr)
         # TODO: make thread pool size configurable
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
@@ -197,6 +51,9 @@ class VolumeClient(object):
         for fs in fs_map['filesystems']:
             self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
 
+    def is_stopping(self):
+        return self.stopping.isSet()
+
     def shutdown(self):
         log.info("shutting down")
         # first, note that we're shutting down
@@ -214,110 +71,21 @@ class VolumeClient(object):
             lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
         self.mgr.cluster_log("cluster", lvl, msg)
 
-    def gen_pool_names(self, volname):
-        """
-        return metadata and data pool name (from a filesystem/volume name) as a tuple
-        """
-        return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
-
-    def get_fs(self, fs_name):
-        fs_map = self.mgr.get('fs_map')
-        for fs in fs_map['filesystems']:
-            if fs['mdsmap']['fs_name'] == fs_name:
-                return fs
-        return None
-
-    def get_mds_names(self, fs_name):
-        fs = self.get_fs(fs_name)
-        if fs is None:
-            return []
-        return [mds['name'] for mds in fs['mdsmap']['info'].values()]
-
-    def volume_exists(self, volname):
-        return self.get_fs(volname) is not None
-
     def volume_exception_to_retval(self, ve):
         """
         return a tuple representation from a volume exception
         """
         return ve.to_tuple()
 
-    def create_pool(self, pool_name, pg_num):
-        # create the given pool
-        command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
-        r, outb, outs = self.mgr.mon_command(command)
-        return r, outb, outs
-
-    def remove_pool(self, pool_name):
-        command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
-                   'yes_i_really_really_mean_it': True}
-        return self.mgr.mon_command(command)
-
-    def create_filesystem(self, fs_name, metadata_pool, data_pool):
-        command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
-                   'data': data_pool}
-        return self.mgr.mon_command(command)
-
-    def remove_filesystem(self, fs_name):
-        command = {'prefix': 'fs fail', 'fs_name': fs_name}
-        r, outb, outs = self.mgr.mon_command(command)
-        if r != 0:
-            return r, outb, outs
-
-        command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
-        return self.mgr.mon_command(command)
-
-    def create_mds(self, fs_name):
-        spec = orchestrator.StatelessServiceSpec()
-        spec.name = fs_name
-        try:
-            completion = self.mgr.add_stateless_service("mds", spec)
-            self.mgr._orchestrator_wait([completion])
-            orchestrator.raise_if_exception(completion)
-        except (ImportError, orchestrator.OrchestratorError):
-            return 0, "", "Volume created successfully (no MDS daemons created)"
-        except Exception as e:
-            # Don't let detailed orchestrator exceptions (python backtraces)
-            # bubble out to the user
-            log.exception("Failed to create MDS daemons")
-            return -errno.EINVAL, "", str(e)
-        return 0, "", ""
-
     ### volume operations -- create, rm, ls
 
-    def create_volume(self, volname):
-        """
-        create volume  (pool, filesystem and mds)
-        """
-        if self.stopping.isSet():
+    def create_fs_volume(self, volname):
+        if self.is_stopping():
             return -errno.ESHUTDOWN, "", "shutdown in progress"
+        return create_volume(self.mgr, volname)
 
-        metadata_pool, data_pool = self.gen_pool_names(volname)
-        # create pools
-        r, outs, outb = self.create_pool(metadata_pool, 16)
-        if r != 0:
-            return r, outb, outs
-        r, outb, outs = self.create_pool(data_pool, 8)
-        if r != 0:
-            #cleanup
-            self.remove_pool(metadata_pool)
-            return r, outb, outs
-        # create filesystem
-        r, outb, outs = self.create_filesystem(volname, metadata_pool, data_pool)
-        if r != 0:
-            log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
-            #cleanup
-            self.remove_pool(data_pool)
-            self.remove_pool(metadata_pool)
-            return r, outb, outs
-        # create mds
-        return self.create_mds(volname)
-
-    def delete_volume(self, volname, confirm):
-        """
-        delete the given module (tear down mds, remove filesystem)
-        """
-        if self.stopping.isSet():
+    def delete_fs_volume(self, volname, confirm):
+        if self.is_stopping():
             return -errno.ESHUTDOWN, "", "shutdown in progress"
 
         if confirm != "--yes-i-really-mean-it":
@@ -328,230 +96,131 @@ class VolumeClient(object):
 
         self.purge_queue.cancel_purge_job(volname)
         self.connection_pool.del_fs_handle(volname, wait=True)
-        # Tear down MDS daemons
-        try:
-            completion = self.mgr.remove_stateless_service("mds", volname)
-            self.mgr._orchestrator_wait([completion])
-            orchestrator.raise_if_exception(completion)
-        except (ImportError, orchestrator.OrchestratorError):
-            log.warning("OrchestratorError, not tearing down MDS daemons")
-        except Exception as e:
-            # Don't let detailed orchestrator exceptions (python backtraces)
-            # bubble out to the user
-            log.exception("Failed to tear down MDS daemons")
-            return -errno.EINVAL, "", str(e)
-
-        # In case orchestrator didn't tear down MDS daemons cleanly, or
-        # there was no orchestrator, we force the daemons down.
-        if self.volume_exists(volname):
-            r, outb, outs = self.remove_filesystem(volname)
-            if r != 0:
-                return r, outb, outs
-        else:
-            err = "Filesystem not found for volume '{0}'".format(volname)
-            log.warning(err)
-            return -errno.ENOENT, "", err
-        metadata_pool, data_pool = self.gen_pool_names(volname)
-        r, outb, outs = self.remove_pool(metadata_pool)
-        if r != 0:
-            return r, outb, outs
-        return self.remove_pool(data_pool)
-
-    def list_volumes(self):
+        return delete_volume(self.mgr, volname)
+
+    def list_fs_volumes(self):
         if self.stopping.isSet():
             return -errno.ESHUTDOWN, "", "shutdown in progress"
+        volumes = list_volumes(self.mgr)
+        return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
 
-        result = []
-        fs_map = self.mgr.get("fs_map")
-        for f in fs_map['filesystems']:
-            result.append({'name': f['mdsmap']['fs_name']})
-        return 0, json.dumps(result, indent=2), ""
+    ### subvolume operations
 
-    def group_exists(self, sv, spec):
-        # default group need not be explicitly created (as it gets created
-        # at the time of subvolume, snapshot and other create operations).
-        return spec.is_default_group() or sv.get_group_path(spec)
+    def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
+        size       = kwargs['size']
+        pool       = kwargs['pool_layout']
+        uid        = kwargs['uid']
+        gid        = kwargs['gid']
+        mode       = kwargs['mode']
 
-    @staticmethod
-    def octal_str_to_decimal_int(mode):
+        oct_mode = octal_str_to_decimal_int(mode)
         try:
-            return int(mode, 8)
-        except ValueError:
-            raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
-
-    def connection_pool_wrap(func):
-        """
-        decorator that wraps subvolume calls by fetching filesystem handle
-        from the connection pool when fs_handle argument is empty, otherwise
-        just invoke func with the passed in filesystem handle. Also handles
-        call made to non-existent volumes (only when fs_handle is empty).
-        """
-        def conn_wrapper(self, fs_handle, **kwargs):
-            fs_h = fs_handle
-            fs_name = kwargs['vol_name']
-            # note that force arg is available for remove type commands
-            force = kwargs.get('force', False)
-
-            if self.stopping.isSet():
-                return -errno.ESHUTDOWN, "", "shutdown in progress"
-
-            # fetch the connection from the pool
-            if not fs_handle:
-                try:
-                    fs_h = self.connection_pool.get_fs_handle(fs_name)
-                except VolumeException as ve:
-                    if not force:
-                        return self.volume_exception_to_retval(ve)
-                    return 0, "", ""
-
-            # invoke the actual routine w/ fs handle
-            result = func(self, fs_h, **kwargs)
-
-            # hand over the connection back to the pool
-            if fs_h:
-                self.connection_pool.put_fs_handle(fs_name)
-            return result
-        return conn_wrapper
-
-    def nametojson(self, names):
-        """
-        convert the list of names to json
-        """
-
-        namedict = []
-        for i in range(len(names)):
-            namedict.append({'name': names[i].decode('utf-8')})
-        return json.dumps(namedict, indent=2)
-
-    ### subvolume operations
+            create_subvol(
+                fs_handle, self.volspec, group, subvolname, size, False, pool, oct_mode, uid, gid)
+        except VolumeException as ve:
+            # kick the purge threads for async removal -- note that this
+            # assumes that the subvolume is moved to trashcan for cleanup on error.
+            self.purge_queue.queue_purge_job(volname)
+            raise ve
 
-    @connection_pool_wrap
-    def create_subvolume(self, fs_handle, **kwargs):
+    def create_subvolume(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
-        size       = kwargs['size']
-        pool       = kwargs['pool_layout']
-        uid        = kwargs['uid']
-        gid        = kwargs['gid']
-        mode       = kwargs['mode']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
-                        "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname))
-                sv.create_subvolume(spec, size, pool=pool, uid=uid, gid=gid, mode=self.octal_str_to_decimal_int(mode))
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    try:
+                        with open_subvol(fs_handle, self.volspec, group, subvolname):
+                            # idempotent creation -- valid.
+                            pass
+                    except VolumeException as ve:
+                        if ve.errno == -errno.ENOENT:
+                            self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
+                        else:
+                            raise
         except VolumeException as ve:
+            # volume/group does not exist or subvolume creation failed
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume(self, fs_handle, **kwargs):
+    def remove_subvolume(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
         force      = kwargs['force']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if self.group_exists(sv, spec):
-                    sv.remove_subvolume(spec, force)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    remove_subvol(fs_handle, self.volspec, group, subvolname)
+                    # kick the purge threads for async removal -- note that this
+                    # assumes that the subvolume is moved to trash can.
+                    # TODO: make purge queue as singleton so that trash can kicks
+                    # the purge threads on dump.
                     self.purge_queue.queue_purge_job(volname)
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
-                        "subvolume '{1}'".format(groupname, subvolname))
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def resize_subvolume(self, fs_handle, **kwargs):
+    def resize_subvolume(self, **kwargs):
         ret        = 0, "", ""
+        volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         newsize    = kwargs['new_size']
+        noshrink   = kwargs['no_shrink']
         groupname  = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
-                        "'ceph fs subvolumegroup create' before creating or resizing subvolumes".format(groupname))
-                subvolpath = sv.get_subvolume_path(spec)
-                if not subvolpath:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume '{0}' not found, create it with " \
-                        "'ceph fs subvolume create' before resizing subvolumes".format(subvolname))
-
-                try:
-                    newsize = int(newsize)
-                except ValueError:
-                    newsize = newsize.lower()
-                    nsize, usedbytes = sv.resize_infinite(subvolpath, newsize)
-                    ret = 0, json.dumps([{'bytes_used': usedbytes}, {'bytes_quota': nsize}, {'bytes_pcent': "undefined"}], indent=2), ""
-                else:
-                    noshrink = kwargs['no_shrink']
-                    nsize, usedbytes = sv.resize_subvolume(subvolpath, newsize, noshrink)
-                    ret = 0, json.dumps([{'bytes_used': usedbytes}, {'bytes_quota': nsize},
-                                         {'bytes_pcent': '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}], indent=2), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        nsize, usedbytes = subvolume.resize(newsize, noshrink)
+                        ret = 0, json.dumps(
+                            [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+                             {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+                            indent=4, sort_keys=True), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def subvolume_getpath(self, fs_handle, **kwargs):
+    def subvolume_getpath(self, **kwargs):
         ret        = None
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-                path = sv.get_subvolume_path(spec)
-                if not path:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume '{0}' not found".format(subvolname))
-                ret = 0, path.decode("utf-8"), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        subvolpath = subvolume.path
+                        ret = 0, subvolpath.decode("utf-8"), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def list_subvolumes(self, fs_handle, **kwargs):
+    def list_subvolumes(self, **kwargs):
         ret        = 0, "", ""
+        volname    = kwargs['vol_name']
         groupname  = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(None, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-                path = sv.get_group_path(spec)
-                # When default subvolume group is not yet created we just return an empty list.
-                if path is None:
-                    ret = 0, '[]', ""
-                else:
-                    subvolumes = sv.get_dir_entries(path)
-                    ret = 0, self.nametojson(subvolumes), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    subvolumes = group.list_subvolumes()
+                    ret = 0, name_to_json(subvolumes), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### subvolume snapshot
 
-    @connection_pool_wrap
-    def create_subvolume_snapshot(self, fs_handle, **kwargs):
+    def create_subvolume_snapshot(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
@@ -559,78 +228,53 @@ class VolumeClient(object):
         groupname  = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
-                        "snapshot '{1}'".format(groupname, snapname))
-                if not sv.get_subvolume_path(spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume '{0}' not found, cannot create snapshot " \
-                        "'{1}'".format(subvolname, snapname))
-                sv.create_subvolume_snapshot(spec, snapname)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        subvolume.create_snapshot(snapname)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_snapshot(self, fs_handle, **kwargs):
+    def remove_subvolume_snapshot(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         snapname   = kwargs['snap_name']
         groupname  = kwargs['group_name']
         force      = kwargs['force']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if self.group_exists(sv, spec):
-                    if sv.get_subvolume_path(spec):
-                        sv.remove_subvolume_snapshot(spec, snapname, force)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
-                            "subvolume snapshot '{1}'".format(subvolname, snapname))
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
-                        "remove subvolume snapshot '{1}'".format(groupname, snapname))
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        subvolume.remove_snapshot(snapname)
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def list_subvolume_snapshots(self, fs_handle, **kwargs):
+    def list_subvolume_snapshots(self, **kwargs):
         ret        = 0, "", ""
+        volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-
-                if sv.get_subvolume_path(spec) == None:
-                    raise VolumeException(-errno.ENOENT,
-                                          "Subvolume '{0}' not found".format(subvolname))
-
-                path = spec.make_subvol_snapdir_path(self.mgr.rados.conf_get('client_snapdir'))
-                snapshots = sv.get_dir_entries(path)
-                ret = 0, self.nametojson(snapshots), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+                        snapshots = subvolume.list_snapshots()
+                        ret = 0, name_to_json(snapshots), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-
     ### group operations
 
-    @connection_pool_wrap
-    def create_subvolume_group(self, fs_handle, **kwargs):
+    def create_subvolume_group(self, **kwargs):
         ret       = 0, "", ""
-        volname   = kwargs['vol_name']
+        volname    = kwargs['vol_name']
         groupname = kwargs['group_name']
         pool      = kwargs['pool_layout']
         uid       = kwargs['uid']
@@ -638,140 +282,101 @@ class VolumeClient(object):
         mode      = kwargs['mode']
 
         try:
-            # TODO: validate that subvol size fits in volume size
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                sv.create_group(spec, pool=pool, uid=uid, gid=gid, mode=self.octal_str_to_decimal_int(mode))
+            with open_volume(self, volname) as fs_handle:
+                try:
+                    with open_group(fs_handle, self.volspec, groupname):
+                        # idempotent creation -- valid.
+                        pass
+                except VolumeException as ve:
+                    if ve.errno == -errno.ENOENT:
+                        oct_mode = octal_str_to_decimal_int(mode)
+                        create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
+                    else:
+                        raise
         except VolumeException as ve:
+            # volume does not exist or subvolume group creation failed
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_group(self, fs_handle, **kwargs):
+    def remove_subvolume_group(self, **kwargs):
         ret       = 0, "", ""
-        volname   = kwargs['vol_name']
+        volname    = kwargs['vol_name']
         groupname = kwargs['group_name']
         force     = kwargs['force']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                # TODO: check whether there are no subvolumes in the group
-                spec = SubvolumeSpec("", groupname)
-                sv.remove_group(spec, force)
+            with open_volume(self, volname) as fs_handle:
+                remove_group(fs_handle, self.volspec, groupname)
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def getpath_subvolume_group(self, fs_handle, **kwargs):
+    def getpath_subvolume_group(self, **kwargs):
+        volname    = kwargs['vol_name']
         groupname  = kwargs['group_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                path = sv.get_group_path(spec)
-                if path is None:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-                return 0, path.decode("utf-8"), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    return 0, group.path.decode('utf-8'), ""
         except VolumeException as ve:
             return self.volume_exception_to_retval(ve)
 
-    @connection_pool_wrap
-    def list_subvolume_groups(self, fs_handle, **kwargs):
-        ret = 0, "", ""
-
+    def list_subvolume_groups(self, **kwargs):
+        volname = kwargs['vol_name']
+        ret     = 0, '[]', ""
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                subvolumegroups = sv.get_dir_entries(SubvolumeSpec.DEFAULT_SUBVOL_PREFIX)
-                ret = 0, self.nametojson(subvolumegroups), ""
+            with open_volume(self, volname) as fs_handle:
+                groups = listdir(fs_handle, self.volspec.base_dir)
+                ret = 0, name_to_json(groups), ""
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not ve.errno == -errno.ENOENT:
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### group snapshot
 
-    @connection_pool_wrap
-    def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
+    def create_subvolume_group_snapshot(self, **kwargs):
         ret       = 0, "", ""
         volname   = kwargs['vol_name']
         groupname = kwargs['group_name']
         snapname  = kwargs['snap_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
-                        "snapshot '{1}'".format(groupname, snapname))
-                sv.create_group_snapshot(spec, snapname)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    group.create_snapshot(snapname)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
+    def remove_subvolume_group_snapshot(self, **kwargs):
         ret       = 0, "", ""
         volname   = kwargs['vol_name']
         groupname = kwargs['group_name']
         snapname  = kwargs['snap_name']
         force     = kwargs['force']
-        try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                if self.group_exists(sv, spec):
-                    sv.remove_group_snapshot(spec, snapname, force)
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
-                        "remove it".format(groupname))
-        except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
-        return ret
-
-    @connection_pool_wrap
-    def list_subvolume_group_snapshots(self, fs_handle, **kwargs):
-        ret        = 0, "", ""
-        groupname  = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(None, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-
-                path = spec.make_group_snapdir_path(self.mgr.rados.conf_get('client_snapdir'))
-                snapshots = sv.get_dir_entries(path)
-                ret = 0, self.nametojson(snapshots), ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    group.remove_snapshot(snapname)
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
-        return ret
-
-    @connection_pool_wrap
-    def get_subvolume_trash_entry(self, fs_handle, **kwargs):
-        ret = None
-        volname = kwargs['vol_name']
-        exclude = kwargs.get('exclude_entries', [])
-
-        try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", "")
-                path = sv.get_trash_entry(spec, exclude)
-                ret = 0, path, ""
-        except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
-        ret = 0, "", ""
-        volname = kwargs['vol_name']
-        purge_dir = kwargs['purge_dir']
-        should_cancel = kwargs.get('should_cancel', lambda: False)
+    def list_subvolume_group_snapshots(self, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(purge_dir.decode('utf-8'), "")
-                sv.purge_subvolume(spec, should_cancel)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    snapshots = group.list_snapshots()
+                    ret = 0, name_to_json(snapshots), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
index 95ecf2ea5fdbee431536e4aa37af6fe3d1fd1798..e38b805b8bf9bd17cc9b63504dc4250e9b9156fe 100644 (file)
@@ -202,41 +202,41 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
 
     def _cmd_fs_volume_create(self, inbuf, cmd):
         vol_id = cmd['name']
-        return self.vc.create_volume(vol_id)
+        return self.vc.create_fs_volume(vol_id)
 
     def _cmd_fs_volume_rm(self, inbuf, cmd):
         vol_name = cmd['vol_name']
         confirm = cmd.get('yes-i-really-mean-it', None)
-        return self.vc.delete_volume(vol_name, confirm)
+        return self.vc.delete_fs_volume(vol_name, confirm)
 
     def _cmd_fs_volume_ls(self, inbuf, cmd):
-        return self.vc.list_volumes()
+        return self.vc.list_fs_volumes()
 
     def _cmd_fs_subvolumegroup_create(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
         return self.vc.create_subvolume_group(
-            None, vol_name=cmd['vol_name'], group_name=cmd['group_name'],
-            pool_layout=cmd.get('pool_layout', None), uid=cmd.get('uid', None),
-            gid=cmd.get('gid', None), mode=cmd.get('mode', '755'))
+            vol_name=cmd['vol_name'], group_name=cmd['group_name'],
+            pool_layout=cmd.get('pool_layout', None), mode=cmd.get('mode', '755'),
+            uid=cmd.get('uid', None), gid=cmd.get('gid', None))
 
     def _cmd_fs_subvolumegroup_rm(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        return self.vc.remove_subvolume_group(None, vol_name=cmd['vol_name'],
+        return self.vc.remove_subvolume_group(vol_name=cmd['vol_name'],
                                               group_name=cmd['group_name'],
                                               force=cmd.get('force', False))
 
     def _cmd_fs_subvolumegroup_ls(self, inbuf, cmd):
-        return self.vc.list_subvolume_groups(None, vol_name=cmd['vol_name'])
+        return self.vc.list_subvolume_groups(vol_name=cmd['vol_name'])
 
     def _cmd_fs_subvolume_create(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        return self.vc.create_subvolume(None, vol_name=cmd['vol_name'],
+        return self.vc.create_subvolume(vol_name=cmd['vol_name'],
                                         sub_name=cmd['sub_name'],
                                         group_name=cmd.get('group_name', None),
                                         size=cmd.get('size', None),
@@ -249,60 +249,58 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        return self.vc.remove_subvolume(None, vol_name=cmd['vol_name'],
+        return self.vc.remove_subvolume(vol_name=cmd['vol_name'],
                                         sub_name=cmd['sub_name'],
                                         group_name=cmd.get('group_name', None),
                                         force=cmd.get('force', False))
 
     def _cmd_fs_subvolume_ls(self, inbuf, cmd):
-        return self.vc.list_subvolumes(None, vol_name=cmd['vol_name'],
+        return self.vc.list_subvolumes(vol_name=cmd['vol_name'],
                                        group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolumegroup_getpath(self, inbuf, cmd):
         return self.vc.getpath_subvolume_group(
-                None, vol_name=cmd['vol_name'], group_name=cmd['group_name'])
+            vol_name=cmd['vol_name'], group_name=cmd['group_name'])
 
     def _cmd_fs_subvolume_getpath(self, inbuf, cmd):
-        return self.vc.subvolume_getpath(None, vol_name=cmd['vol_name'],
+        return self.vc.subvolume_getpath(vol_name=cmd['vol_name'],
                                          sub_name=cmd['sub_name'],
                                          group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolumegroup_snapshot_create(self, inbuf, cmd):
-        return self.vc.create_subvolume_group_snapshot(None, vol_name=cmd['vol_name'],
+        return self.vc.create_subvolume_group_snapshot(vol_name=cmd['vol_name'],
                                                        group_name=cmd['group_name'],
                                                        snap_name=cmd['snap_name'])
 
     def _cmd_fs_subvolumegroup_snapshot_rm(self, inbuf, cmd):
-        return self.vc.remove_subvolume_group_snapshot(None, vol_name=cmd['vol_name'],
+        return self.vc.remove_subvolume_group_snapshot(vol_name=cmd['vol_name'],
                                                        group_name=cmd['group_name'],
                                                        snap_name=cmd['snap_name'],
                                                        force=cmd.get('force', False))
 
     def _cmd_fs_subvolumegroup_snapshot_ls(self, inbuf, cmd):
-        return self.vc.list_subvolume_group_snapshots(None, vol_name=cmd['vol_name'],
+        return self.vc.list_subvolume_group_snapshots(vol_name=cmd['vol_name'],
                                                       group_name=cmd['group_name'])
 
     def _cmd_fs_subvolume_snapshot_create(self, inbuf, cmd):
-        return self.vc.create_subvolume_snapshot(None, vol_name=cmd['vol_name'],
+        return self.vc.create_subvolume_snapshot(vol_name=cmd['vol_name'],
                                                  sub_name=cmd['sub_name'],
                                                  snap_name=cmd['snap_name'],
                                                  group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolume_snapshot_rm(self, inbuf, cmd):
-        return self.vc.remove_subvolume_snapshot(None, vol_name=cmd['vol_name'],
+        return self.vc.remove_subvolume_snapshot(vol_name=cmd['vol_name'],
                                                  sub_name=cmd['sub_name'],
                                                  snap_name=cmd['snap_name'],
                                                  group_name=cmd.get('group_name', None),
                                                  force=cmd.get('force', False))
 
     def _cmd_fs_subvolume_snapshot_ls(self, inbuf, cmd):
-        return self.vc.list_subvolume_snapshots(None, vol_name=cmd['vol_name'],
+        return self.vc.list_subvolume_snapshots(vol_name=cmd['vol_name'],
                                                 sub_name=cmd['sub_name'],
                                                 group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolume_resize(self, inbuf, cmd):
-        return self.vc.resize_subvolume(None, vol_name=cmd['vol_name'],
-                                        sub_name=cmd['sub_name'],
-                                        new_size=cmd['new_size'],
-                                        group_name=cmd.get('group_name', None),
+        return self.vc.resize_subvolume(vol_name=cmd['vol_name'], sub_name=cmd['sub_name'],
+                                        new_size=cmd['new_size'], group_name=cmd.get('group_name', None),
                                         no_shrink=cmd.get('no_shrink', False))