]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr / volumes: maintain connection pool for fs volumes
authorVenky Shankar <vshankar@redhat.com>
Mon, 17 Jun 2019 09:36:54 +0000 (05:36 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 8 Jul 2019 03:58:16 +0000 (23:58 -0400)
Right now every [sub]volume call does a connect/disconnect to the
cephfs filesystem. This is unnecessary and can be optimized by
caching the filesystem handle in a connection pool and (re)using
the handle for subsequent [sub]volume operations.

This would be useful for implementing features such as purge queue
for asynchronous subvolume deletes.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/volumes/fs/subvolume.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index 016b00fa69daf33be277aafba80e4dbd68d64bf3..ce5142d3784fadd407fa1769f844a38f305bcf91 100644 (file)
@@ -33,11 +33,8 @@ class SubVolume(object):
     """
 
 
-    def __init__(self, mgr, fs_name=None):
-        self.fs = None
-        self.fs_name = fs_name
-        self.connected = False
-
+    def __init__(self, mgr, fs_handle):
+        self.fs = fs_handle
         self.rados = mgr.rados
 
     def _mkdir_p(self, path, mode=0o755):
@@ -261,29 +258,8 @@ class SubVolume(object):
 
     ### context manager routines
 
-    def connect(self):
-        log.debug("Connecting to cephfs...")
-        self.fs = cephfs.LibCephFS(rados_inst=self.rados)
-        log.debug("CephFS initializing...")
-        self.fs.init()
-        log.debug("CephFS mounting...")
-        self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
-        log.debug("Connection to cephfs complete")
-
-    def disconnect(self):
-        log.info("disconnect")
-        if self.fs:
-            log.debug("Disconnecting cephfs...")
-            self.fs.shutdown()
-            self.fs = None
-            log.debug("Disconnecting cephfs complete")
-
     def __enter__(self):
-        self.connect()
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        self.disconnect()
-
-    def __del__(self):
-        self.disconnect()
+        pass
index 8cc7ef26fa05883c8bb093097228e6266ddab736..c1ebe7e325149a0a7ea0f15ed04b1fc8bb480ef4 100644 (file)
@@ -1,6 +1,14 @@
 import json
+import time
 import errno
 import logging
+from threading import Lock
+try:
+    # py2
+    from threading import _Timer as Timer
+except ImportError:
+    #py3
+    from threading import Timer
 
 import cephfs
 import orchestrator
@@ -11,9 +19,147 @@ from .exception import VolumeException
 
 log = logging.getLogger(__name__)
 
+class ConnectionPool(object):
+    class Connection(object):
+        def __init__(self, mgr, fs_name):
+            self.fs = None
+            self.mgr = mgr
+            self.fs_name = fs_name
+            self.ops_in_progress = 0
+            self.last_used = time.time()
+            self.fs_id = self.get_fs_id()
+
+        def get_fs_id(self):
+            fs_map = self.mgr.get('fs_map')
+            for fs in fs_map['filesystems']:
+                if fs['mdsmap']['fs_name'] == self.fs_name:
+                    return fs['id']
+            raise VolumeException(
+                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+        def get_fs_handle(self):
+            self.last_used = time.time()
+            self.ops_in_progress += 1
+            return self.fs
+
+        def put_fs_handle(self):
+            assert self.ops_in_progress > 0
+            self.ops_in_progress -= 1
+
+        def del_fs_handle(self):
+            if self.is_connection_valid():
+                self.disconnect()
+            else:
+                self.abort()
+
+        def is_connection_valid(self):
+            fs_id = None
+            try:
+                fs_id = self.get_fs_id()
+            except:
+                # the filesystem does not exist now -- connection is not valid.
+                pass
+            return self.fs_id == fs_id
+
+        def is_connection_idle(self, timeout):
+            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+        def connect(self):
+            assert self.ops_in_progress == 0
+            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+            log.debug("CephFS initializing...")
+            self.fs.init()
+            log.debug("CephFS mounting...")
+            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+        def disconnect(self):
+            assert self.ops_in_progress == 0
+            log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+            self.fs.shutdown()
+            self.fs = None
+
+        def abort(self):
+            assert self.ops_in_progress == 0
+            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+            self.fs.abort_conn()
+            self.fs = None
+
+    class RTimer(Timer):
+        """
+        recurring timer variant of Timer
+        """
+        def run(self):
+            while not self.finished.is_set():
+                self.finished.wait(self.interval)
+                self.function(*self.args, **self.kwargs)
+            self.finished.set()
+
+    # TODO: make this configurable
+    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
+    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.connections = {}
+        self.lock = Lock()
+        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+                                                self.cleanup_connections)
+        self.timer_task.start()
+
+    def cleanup_connections(self):
+        with self.lock:
+            log.info("scanning for idle connections..")
+            idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
+                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+            for fs_name in idle_fs:
+                log.info("cleaning up connection for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name)
+
+    def get_fs_handle(self, fs_name):
+        with self.lock:
+            conn = None
+            try:
+                conn = self.connections.get(fs_name, None)
+                if conn:
+                    if conn.is_connection_valid():
+                        return conn.get_fs_handle()
+                    else:
+                        # filesystem id changed beneath us (or the filesystem does not exist).
+                        # this is possible if the filesystem got removed (and recreated with
+                        # same name) via "ceph fs rm/new" mon command.
+                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+                        self._del_fs_handle(fs_name)
+                conn = ConnectionPool.Connection(self.mgr, fs_name)
+                conn.connect()
+            except cephfs.Error as e:
+                # try to provide a better error string if possible
+                if e.args[0] == errno.ENOENT:
+                    raise VolumeException(
+                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+                raise VolumeException(-e.args[0], e.args[1])
+            self.connections[fs_name] = conn
+            return conn.get_fs_handle()
+
+    def put_fs_handle(self, fs_name):
+        with self.lock:
+            conn = self.connections.get(fs_name, None)
+            if conn:
+                conn.put_fs_handle()
+
+    def _del_fs_handle(self, fs_name):
+        conn = self.connections.pop(fs_name, None)
+        if conn:
+            conn.del_fs_handle()
+    def del_fs_handle(self, fs_name):
+        with self.lock:
+            self._del_fs_handle(fs_name)
+
 class VolumeClient(object):
     def __init__(self, mgr):
         self.mgr = mgr
+        self.connection_pool = ConnectionPool(self.mgr)
 
     def gen_pool_names(self, volname):
         """
@@ -127,6 +273,7 @@ class VolumeClient(object):
         """
         delete the given module (tear down mds, remove filesystem)
         """
+        self.connection_pool.del_fs_handle(volname)
         # Tear down MDS daemons
         try:
             completion = self.mgr.remove_stateless_service("mds", volname)
@@ -176,16 +323,51 @@ class VolumeClient(object):
         except ValueError:
             raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
 
+    def connection_pool_wrap(func):
+        """
+        decorator that wraps subvolume calls by fetching filesystem handle
+        from the connection pool when fs_handle argument is empty, otherwise
+        just invoke func with the passed in filesystem handle. Also handles
+        call made to non-existent volumes (only when fs_handle is empty).
+        """
+        def conn_wrapper(self, fs_handle, **kwargs):
+            fs_h = fs_handle
+            fs_name = kwargs['vol_name']
+            # note that force arg is available for remove type commands
+            force = kwargs.get('force', False)
+
+            # fetch the connection from the pool
+            if not fs_handle:
+                try:
+                    fs_h = self.connection_pool.get_fs_handle(fs_name)
+                except VolumeException as ve:
+                    if not force:
+                        return self.volume_exception_to_retval(ve)
+                    return 0, "", ""
+
+            # invoke the actual routine w/ fs handle
+            result = func(self, fs_h, **kwargs)
+
+            # hand over the connection back to the pool
+            if fs_h:
+                self.connection_pool.put_fs_handle(fs_name)
+            return result
+        return conn_wrapper
+
     ### subvolume operations
 
-    def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def create_subvolume(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        size       = kwargs['size']
+        pool       = kwargs['pool_layout']
+        mode       = kwargs['mode']
+
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
-                    "volume create` before trying to create subvolumes".format(volname))
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -196,36 +378,35 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume(self, volname, subvolname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        force      = kwargs['force']
         try:
-            fs = self.get_fs(volname)
-            if fs:
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec(subvolname, groupname)
-                    if self.group_exists(sv, spec):
-                        sv.remove_subvolume(spec, force)
-                        sv.purge_subvolume(spec)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
-                            "subvolume '{1}'".format(groupname, subvolname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
-                    "'{1}'".format(volname, subvolname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(subvolname, groupname)
+                if self.group_exists(sv, spec):
+                    sv.remove_subvolume(spec, force)
+                    sv.purge_subvolume(spec)
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
+                        "subvolume '{1}'".format(groupname, subvolname))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def subvolume_getpath(self, volname, subvolname, groupname):
-        ret = None
+    @connection_pool_wrap
+    def subvolume_getpath(self, fs_handle, **kwargs):
+        ret        = None
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found".format(volname))
-
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -241,15 +422,16 @@ class VolumeClient(object):
 
     ### subvolume snapshot
 
-    def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname):
-        ret = 0, "", ""
-        try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
-                    "'{1}'".format(volname, snapname))
+    @connection_pool_wrap
+    def create_subvolume_snapshot(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
 
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -264,76 +446,76 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume_snapshot(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+        force      = kwargs['force']
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec(subvolname, groupname)
-                    if self.group_exists(sv, spec):
-                        if sv.get_subvolume_path(spec):
-                            sv.remove_subvolume_snapshot(spec, snapname, force)
-                        elif not force:
-                            raise VolumeException(
-                                -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
-                                "subvolume snapshot '{1}'".format(subvolname, snapname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(subvolname, groupname)
+                if self.group_exists(sv, spec):
+                    if sv.get_subvolume_path(spec):
+                        sv.remove_subvolume_snapshot(spec, snapname, force)
                     elif not force:
                         raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
-                            "remove subvolume snapshot '{1}'".format(groupname, snapname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
-                    "snapshot '{1}'".format(volname, snapname))
+                            -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
+                            "subvolume snapshot '{1}'".format(subvolname, snapname))
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
+                        "remove subvolume snapshot '{1}'".format(groupname, snapname))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### group operations
 
-    def create_subvolume_group(self, volname, groupname, mode='755', pool=None):
-        ret = 0, "", ""
-        try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
-                    "volume create` before trying to create subvolume groups".format(volname))
+    @connection_pool_wrap
+    def create_subvolume_group(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        pool      = kwargs['pool_layout']
+        mode      = kwargs['mode']
 
+        try:
             # TODO: validate that subvol size fits in volume size
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec("", groupname)
                 sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_group(self, volname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume_group(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        force     = kwargs['force']
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    # TODO: check whether there are no subvolumes in the group
-                    spec = SubvolumeSpec("", groupname)
-                    sv.remove_group(spec, force)
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
-                    "group '{0}'".format(volname, groupname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                # TODO: check whether there are no subvolumes in the group
+                spec = SubvolumeSpec("", groupname)
+                sv.remove_group(spec, force)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### group snapshot
 
-    def create_subvolume_group_snapshot(self, volname, groupname, snapname):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        snapname  = kwargs['snap_name']
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
-                    "'{1}'".format(volname, snapname))
-
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec("", groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -344,22 +526,22 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        snapname  = kwargs['snap_name']
+        force     = kwargs['force']
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec("", groupname)
-                    if self.group_exists(sv, spec):
-                        sv.remove_group_snapshot(spec, snapname, force)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
-                            "remove it".format(groupname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
-                    "snapshot '{1}'".format(volname, snapname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec("", groupname)
+                if self.group_exists(sv, spec):
+                    sv.remove_group_snapshot(spec, snapname, force)
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
+                        "remove it".format(groupname))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
index 60d78a964959f53f29d46b95c617ed73ad8ce459..167b7e0044b17136cc9ca2dd75565275c9f85376 100644 (file)
@@ -199,82 +199,63 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        vol_name = cmd['vol_name']
-        group_name = cmd['group_name']
-        pool_layout = cmd.get('pool_layout', None)
-        mode = cmd.get('mode', '755')
-
-        return self.vc.create_subvolume_group(vol_name, group_name, mode=mode, pool=pool_layout)
+        return self.vc.create_subvolume_group(
+            None, vol_name=cmd['vol_name'], group_name=cmd['group_name'],
+            pool_layout=cmd.get('pool_layout', None), mode=cmd.get('mode', '755'))
 
     def _cmd_fs_subvolumegroup_rm(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        vol_name = cmd['vol_name']
-        group_name = cmd['group_name']
-        force = cmd.get('force', False)
-
-        return self.vc.remove_subvolume_group(vol_name, group_name, force)
+        return self.vc.remove_subvolume_group(None, vol_name=cmd['vol_name'],
+                                              group_name=cmd['group_name'],
+                                              force=cmd.get('force', False))
 
     def _cmd_fs_subvolume_create(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        vol_name = cmd['vol_name']
-        sub_name = cmd['sub_name']
-        size = cmd.get('size', None)
-        group_name = cmd.get('group_name', None)
-        pool_layout = cmd.get('pool_layout', None)
-        mode = cmd.get('mode', '755')
-
-        return self.vc.create_subvolume(vol_name, sub_name, group_name, size, mode=mode, pool=pool_layout)
+        return self.vc.create_subvolume(None, vol_name=cmd['vol_name'],
+                                        sub_name=cmd['sub_name'],
+                                        group_name=cmd.get('group_name', None),
+                                        size=cmd.get('size', None),
+                                        pool_layout=cmd.get('pool_layout', None),
+                                        mode=cmd.get('mode', '755'))
 
     def _cmd_fs_subvolume_rm(self, inbuf, cmd):
         """
         :return: a 3-tuple of return code(int), empty string(str), error message (str)
         """
-        vol_name = cmd['vol_name']
-        sub_name = cmd['sub_name']
-        force = cmd.get('force', False)
-        group_name = cmd.get('group_name', None)
-
-        return self.vc.remove_subvolume(vol_name, sub_name, group_name, force)
+        return self.vc.remove_subvolume(None, vol_name=cmd['vol_name'],
+                                        sub_name=cmd['sub_name'],
+                                        group_name=cmd.get('group_name', None),
+                                        force=cmd.get('force', False))
 
     def _cmd_fs_subvolume_getpath(self, inbuf, cmd):
-        vol_name = cmd['vol_name']
-        sub_name = cmd['sub_name']
-        group_name = cmd.get('group_name', None)
-
-        return self.vc.subvolume_getpath(vol_name, sub_name, group_name)
+        return self.vc.subvolume_getpath(None, vol_name=cmd['vol_name'],
+                                         sub_name=cmd['sub_name'],
+                                         group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolumegroup_snapshot_create(self, inbuf, cmd):
-        vol_name = cmd['vol_name']
-        group_name = cmd['group_name']
-        snap_name = cmd['snap_name']
-
-        return self.vc.create_subvolume_group_snapshot(vol_name, group_name, snap_name)
+        return self.vc.create_subvolume_group_snapshot(None, vol_name=cmd['vol_name'],
+                                                       group_name=cmd['group_name'],
+                                                       snap_name=cmd['snap_name'])
 
     def _cmd_fs_subvolumegroup_snapshot_rm(self, inbuf, cmd):
-        vol_name = cmd['vol_name']
-        group_name = cmd['group_name']
-        snap_name = cmd['snap_name']
-        force = cmd.get('force', False)
-
-        return self.vc.remove_subvolume_group_snapshot(vol_name, group_name, snap_name, force)
+        return self.vc.remove_subvolume_group_snapshot(None, vol_name=cmd['vol_name'],
+                                                       group_name=cmd['group_name'],
+                                                       snap_name=cmd['snap_name'],
+                                                       force=cmd.get('force', False))
 
     def _cmd_fs_subvolume_snapshot_create(self, inbuf, cmd):
-        vol_name = cmd['vol_name']
-        sub_name = cmd['sub_name']
-        snap_name = cmd['snap_name']
-        group_name = cmd.get('group_name', None)
-
-        return self.vc.create_subvolume_snapshot(vol_name, sub_name, snap_name, group_name)
+        return self.vc.create_subvolume_snapshot(None, vol_name=cmd['vol_name'],
+                                                 sub_name=cmd['sub_name'],
+                                                 snap_name=cmd['snap_name'],
+                                                 group_name=cmd.get('group_name', None))
 
     def _cmd_fs_subvolume_snapshot_rm(self, inbuf, cmd):
-        vol_name = cmd['vol_name']
-        sub_name = cmd['sub_name']
-        snap_name = cmd['snap_name']
-        force = cmd.get('force', False)
-        group_name = cmd.get('group_name', None)
-
-        return self.vc.remove_subvolume_snapshot(vol_name, sub_name, snap_name, group_name, force)
+        return self.vc.remove_subvolume_snapshot(None, vol_name=cmd['vol_name'],
+                                                 sub_name=cmd['sub_name'],
+                                                 snap_name=cmd['snap_name'],
+                                                 group_name=cmd.get('group_name', None),
+                                                 force=cmd.get('force', False))