def __str__(self):
return "{0} ({1})".format(self.errno, self.error_str)
+
+class MetadataMgrException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
--- /dev/null
+import errno
+import logging
+import importlib
+
+import cephfs
+
+from .subvolume_base import SubvolumeBase
+from ...exception import VolumeException, MetadataMgrException
+
+log = logging.getLogger(__name__)
+
+class SubvolumeLoader(object):
+ INVALID_VERSION = -1
+
+ SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1']
+
+ def __init__(self):
+ self.max_version = SubvolumeLoader.INVALID_VERSION
+ self.versions = {}
+
+ def _load_module(self, mod_cls):
+ mod_name, cls_name = mod_cls.split('.')
+ mod = importlib.import_module('.versions.{0}'.format(mod_name), package='volumes.fs.operations')
+ return getattr(mod, cls_name)
+
+ def _load_supported_versions(self):
+ for mod_cls in SubvolumeLoader.SUPPORTED_MODULES:
+ cls = self._load_module(mod_cls)
+ log.info("loaded v{0} subvolume".format(cls.version()))
+ if self.max_version is not None or cls.version() > self.max_version:
+ self.max_version = cls.version()
+ self.versions[cls.version()] = cls
+ if self.max_version == SubvolumeLoader.INVALID_VERSION:
+ raise VolumeException("no subvolume version available")
+ log.info("max subvolume version is v{0}".format(self.max_version))
+
+ def _get_subvolume_version(self, version):
+ try:
+ return self.versions[version]
+ except KeyError:
+ raise VolumeException(-errno.EINVAL, "subvolume class v{0} does not exist".format(version))
+
+ def get_subvolume_object_max(self, fs, vol_spec, group, subvolname):
+ return self._get_subvolume_version(self.max_version)(fs, vol_spec, group, subvolname)
+
+ def upgrade_legacy_subvolume(self, fs, subvolume):
+ assert subvolume.legacy_mode
+ try:
+ fs.mkdirs(subvolume.legacy_dir, 0o700)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "error accessing subvolume")
+ qpath = subvolume.base_path.decode('utf-8')
+ subvolume.init_config(self.max_version, SubvolumeBase.SUBVOLUME_TYPE_NORMAL, qpath, "complete")
+
+ def get_subvolume_object(self, fs, vol_spec, group, subvolname, upgrade=True):
+ subvolume = SubvolumeBase(fs, vol_spec, group, subvolname)
+ try:
+ subvolume.discover()
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ return self._get_subvolume_version(version)(fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT and upgrade:
+ self.upgrade_legacy_subvolume(fs, subvolume)
+ subvolume = None
+ return self.get_subvolume_object(fs, vol_spec, group, subvolname, upgrade=False)
+ else:
+ # log the actual error and generalize error string returned to user
+ log.error("error accessing subvolume metadata for '{0}' ({1})".format(subvolname, me))
+ raise VolumeException(-errno.EINVAL, "error accessing subvolume metadata")
+
+loaded_subvolumes = SubvolumeLoader()
+loaded_subvolumes._load_supported_versions()
--- /dev/null
+import os
+import errno
+import logging
+
+try:
+ import configparser
+except ImportError:
+ import ConfigParser as configparser
+
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
+
+import cephfs
+
+from ...exception import MetadataMgrException
+
+log = logging.getLogger(__name__)
+
+class MetadataManager(object):
+ GLOBAL_SECTION = "GLOBAL"
+ GLOBAL_META_KEY_VERSION = "version"
+ GLOBAL_META_KEY_TYPE = "type"
+ GLOBAL_META_KEY_PATH = "path"
+ GLOBAL_META_KEY_STATE = "state"
+
+ MAX_IO_BYTES = 8 * 1024
+
+ def __init__(self, fs, config_path, mode):
+ self.fs = fs
+ self.mode = mode
+ self.config_path = config_path
+ self.config = configparser.ConfigParser()
+
+ def refresh(self):
+ fd = None
+ conf_data = StringIO()
+ try:
+ log.debug("opening config {0}".format(self.config_path))
+ fd = self.fs.open(self.config_path, os.O_RDONLY)
+ while True:
+ data = self.fs.read(fd, -1, MetadataManager.MAX_IO_BYTES)
+ if not len(data):
+ break
+ conf_data.write(data.decode('utf-8'))
+ conf_data.seek(0)
+ self.config.readfp(conf_data)
+ except cephfs.ObjectNotFound:
+ raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path))
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+ finally:
+ if fd is not None:
+ self.fs.close(fd)
+
+ def flush(self):
+ conf_data = StringIO()
+ self.config.write(conf_data)
+ conf_data.seek(0)
+
+ fd = None
+ try:
+ fd = self.fs.open(self.config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
+ wrote = 0
+ while True:
+ data = conf_data.read()
+ if not len(data):
+ break
+ wrote += self.fs.write(fd, data.encode('utf-8'), -1)
+ self.fs.fsync(fd, 0)
+ log.info("wrote {0} bytes to config {1}".format(wrote, self.config_path))
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+ finally:
+ if fd is not None:
+ self.fs.close(fd)
+
+ def init(self, version, typ, path, state):
+ # you may init just once before refresh (helps to overwrite conf)
+ if self.config.has_section(MetadataManager.GLOBAL_SECTION):
+ raise MetadataMgrException(-errno.EINVAL, "init called on an existing config")
+
+ self.add_section(MetadataManager.GLOBAL_SECTION)
+ self.update_section_multi(
+ MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version),
+ MetadataManager.GLOBAL_META_KEY_TYPE : str(typ),
+ MetadataManager.GLOBAL_META_KEY_PATH : str(path),
+ MetadataManager.GLOBAL_META_KEY_STATE : str(state)
+ })
+
+ def add_section(self, section):
+ try:
+ self.config.add_section(section)
+ except configparser.DuplicateSectionError:
+ return
+ except:
+ raise MetadataMgrException(-errno.EINVAL, "error adding section to config")
+
+ def remove_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ self.config.remove_option(section, key)
+
+ def remove_section(self, section):
+ self.config.remove_section(section)
+
+ def update_section(self, section, key, value):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ self.config.set(section, key, str(value))
+
+ def update_section_multi(self, section, dct):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ for key,value in dct.items():
+ self.config.set(section, key, str(value))
+
+ def get_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ if not self.config.has_option(section, key):
+ raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section))
+ return self.config.get(section, key)
+
+ def get_global_option(self, key):
+ return self.get_option(MetadataManager.GLOBAL_SECTION, key)
--- /dev/null
+import os
+import uuid
+import errno
+import logging
+from hashlib import md5
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from ..trash import create_trashcan, open_trashcan
+from ...fs_util import get_ancestor_xattr
+from ...exception import MetadataMgrException, VolumeException
+
+log = logging.getLogger(__name__)
+
+class SubvolumeBase(object):
+ LEGACY_CONF_DIR = "_legacy"
+
+ SUBVOLUME_TYPE_NORMAL = "subvolume"
+
+ def __init__(self, fs, vol_spec, group, subvolname, legacy=False):
+ self.fs = fs
+ self.vol_spec = vol_spec
+ self.group = group
+ self.subvolname = subvolname
+ self.legacy_mode = legacy
+ self.load_config()
+
+ @property
+ def base_path(self):
+ return os.path.join(self.group.path, self.subvolname.encode('utf-8'))
+
+ @property
+ def config_path(self):
+ return os.path.join(self.base_path, b".meta")
+
+ @property
+ def legacy_dir(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), SubvolumeBase.LEGACY_CONF_DIR.encode('utf-8'))
+
+ @property
+ def legacy_config_path(self):
+ m = md5()
+ m.update(self.base_path)
+ meta_config = "{0}.meta".format(m.digest().hex())
+ return os.path.join(self.legacy_dir, meta_config.encode('utf-8'))
+
+ @property
+ def namespace(self):
+ return "{0}{1}".format(self.vol_spec.fs_namespace, self.subvolname)
+
+ @property
+ def legacy_mode(self):
+ return self.legacy
+
+ @legacy_mode.setter
+ def legacy_mode(self, mode):
+ self.legacy = mode
+
+ def load_config(self):
+ if self.legacy_mode:
+ self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
+ else:
+ self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640)
+
+ def _set_attrs(self, path, size, isolate_namespace, pool, mode, uid, gid):
+ # set size
+ if size is not None:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(size))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set pool layout
+ if pool:
+ try:
+ self.fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL,
+ "invalid pool layout '{0}' -- need a valid data pool".format(pool))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # isolate namespace
+ xattr_key = xattr_val = None
+ if isolate_namespace:
+ # enforce security isolation, use separate namespace for this subvolume
+ xattr_key = 'ceph.dir.layout.pool_namespace'
+ xattr_val = self.namespace
+ elif not pool:
+ # If subvolume's namespace layout is not set, then the subvolume's pool
+ # layout remains unset and will undesirably change with ancestor's
+ # pool layout changes.
+ xattr_key = 'ceph.dir.layout.pool'
+ xattr_val = get_ancestor_xattr(self.fs, path, "ceph.dir.layout.pool")
+ if xattr_key and xattr_val:
+ try:
+ self.fs.setxattr(path, xattr_key, xattr_val.encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set uid/gid
+ if uid is None:
+ uid = self.group.uid
+ else:
+ try:
+ uid = int(uid)
+ if uid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid UID")
+ if gid is None:
+ gid = self.group.gid
+ else:
+ try:
+ gid = int(gid)
+ if gid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid GID")
+ if uid is not None and gid is not None:
+ self.fs.chown(path, uid, gid)
+
+ def _resize(self, path, newsize, noshrink):
+ try:
+ newsize = int(newsize)
+ if newsize <= 0:
+ raise VolumeException(-errno.EINVAL, "Invalid subvolume size")
+ except ValueError:
+ newsize = newsize.lower()
+ if not (newsize == "inf" or newsize == "infinite"):
+ raise VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize))
+ newsize = 0
+ noshrink = False
+
+ try:
+ maxbytes = int(self.fs.getxattr(path, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ maxbytes = 0
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ subvolstat = self.fs.stat(path)
+ if newsize > 0 and newsize < subvolstat.st_size:
+ if noshrink:
+ raise VolumeException(-errno.EINVAL, "Can't resize the subvolume. The new size '{0}' would be lesser than the current "
+ "used size '{1}'".format(newsize, subvolstat.st_size))
+
+ if not newsize == maxbytes:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "Cannot set new size for the subvolume. '{0}'".format(e.args[1]))
+ return newsize, subvolstat.st_size
+
+ def init_config(self, version, subvolume_type, subvolume_path, subvolume_state):
+ self.metadata_mgr.init(version, subvolume_type, subvolume_path, subvolume_state)
+ self.metadata_mgr.flush()
+
+ def discover(self):
+ log.debug("discovering subvolume '{0}' [mode: {1}]".format(self.subvolname, "legacy" if self.legacy_mode else "new"))
+ try:
+ self.fs.stat(self.base_path)
+ self.metadata_mgr.refresh()
+ log.debug("loaded subvolume '{0}'".format(self.subvolname))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT and not self.legacy_mode:
+ self.legacy_mode = True
+ self.load_config()
+ self.discover()
+ else:
+ raise
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname))
+
+ def trash_base_dir(self):
+ if self.legacy_mode:
+ self.fs.unlink(self.legacy_config_path)
+ subvol_path = self.base_path
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.dump(subvol_path)
+ log.info("subvolume with path '{0}' moved to trashcan".format(subvol_path))
+
+ def create_base_dir(self, mode):
+ try:
+ self.fs.mkdirs(self.base_path, mode)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
--- /dev/null
+import os
+import uuid
+import errno
+import logging
+
+import cephfs
+
+from .subvolume_base import SubvolumeBase
+from ..template import SubvolumeTemplate
+from ..snapshot_util import mksnap, rmsnap
+from ...exception import VolumeException, MetadataMgrException
+from ...fs_util import listdir
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
+ VERSION = 1
+
+ @staticmethod
+ def version():
+ return SubvolumeV1.VERSION
+
+ @property
+ def path(self):
+ try:
+ # no need to stat the path -- open() does that
+ return self.metadata_mgr.get_global_option('path').encode('utf-8')
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, mode)
+ self._set_attrs(subvol_path, size, isolate_nspace, pool, mode, uid, gid)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ self.init_config(SubvolumeV1.VERSION, "subvolume", qpath, "complete")
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def open(self):
+ try:
+ self.metadata_mgr.refresh()
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ self.fs.stat(subvol_path)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def remove(self):
+ self.trash_base_dir()
+
+ def resize(self, newsize, noshrink):
+ subvol_path = self.path
+ return self._resize(subvol_path, newsize, noshrink)
+
+ def create_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ mksnap(self.fs, snappath)
+
+ def remove_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ rmsnap(self.fs, snappath)
+
+ def list_snapshots(self):
+ try:
+ dirpath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+ return listdir(self.fs, dirpath)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise