def get_config_key(self, key):
return self._ceph_cmd(['config-key', 'get', key])
+ @classmethod
+ def _cmd(cls, args):
+ return cls.mgr_cluster.admin_remote.run(args=args)
+
@classmethod
def _rbd_cmd(cls, cmd):
- args = [
- 'rbd'
- ]
+ args = ['rbd']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def _radosgw_admin_cmd(cls, cmd):
- args = [
- 'radosgw-admin'
- ]
+ args = ['radosgw-admin']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def _rados_cmd(cls, cmd):
args = ['rados']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def mons(cls):
import six
-from .helper import DashboardTestCase
+from .helper import DashboardTestCase, JObj, JList, JLeaf
class CephfsTest(DashboardTestCase):
self.assertIsInstance(clients['data'], list)
self.assertToHave(clients, 'status')
self.assertIsInstance(clients['status'], int)
+
+ def test_ls_mk_rm_dir(self):
+ fs_id = self.fs.get_namespace_id()
+ data = self._get("/api/cephfs/{}/ls_dir".format(fs_id),
+ params={'path': '/'})
+ self.assertStatus(200)
+ self.assertIsInstance(data, list)
+ self.assertEqual(len(data), 0)
+
+ self._post("/api/cephfs/{}/mk_dirs".format(fs_id),
+ params={'path': '/pictures/birds'})
+ self.assertStatus(200)
+
+ data = self._get("/api/cephfs/{}/ls_dir".format(fs_id),
+ params={'path': '/pictures'})
+ self.assertStatus(200)
+ self.assertIsInstance(data, list)
+ self.assertEqual(len(data), 1)
+
+ self._post("/api/cephfs/{}/rm_dir".format(fs_id),
+ params={'path': '/pictures'})
+ self.assertStatus(500)
+ self._post("/api/cephfs/{}/rm_dir".format(fs_id),
+ params={'path': '/pictures/birds'})
+ self.assertStatus(200)
+ self._post("/api/cephfs/{}/rm_dir".format(fs_id),
+ params={'path': '/pictures'})
+ self.assertStatus(200)
+
+ data = self._get("/api/cephfs/{}/ls_dir".format(fs_id),
+ params={'path': '/'})
+ self.assertStatus(200)
+ self.assertIsInstance(data, list)
+ self.assertEqual(len(data), 0)
+
+ def test_snapshots(self):
+ fs_id = self.fs.get_namespace_id()
+ self._post("/api/cephfs/{}/mk_dirs".format(fs_id),
+ params={'path': '/movies/dune'})
+ self.assertStatus(200)
+
+ self._post("/api/cephfs/{}/mk_snapshot".format(fs_id),
+ params={'path': '/movies/dune', 'name': 'test'})
+ self.assertStatus(200)
+
+ data = self._get("/api/cephfs/{}/ls_dir".format(fs_id),
+ params={'path': '/movies'})
+ self.assertStatus(200)
+ self.assertIsInstance(data, list)
+ self.assertEqual(len(data), 1)
+ self.assertSchema(data[0], JObj(sub_elems={
+ 'name': JLeaf(str),
+ 'path': JLeaf(str),
+ 'snapshots': JList(JObj(sub_elems={
+ 'name': JLeaf(str),
+ 'path': JLeaf(str),
+ 'created': JLeaf(str)
+ })),
+ 'quotas': JObj(sub_elems={
+ 'max_bytes': JLeaf(int),
+ 'max_files': JLeaf(int)
+ })
+ }))
+ snapshots = data[0]['snapshots']
+ self.assertEqual(len(snapshots), 1)
+ snapshot = snapshots[0]
+ self.assertEqual(snapshot['name'], "test")
+ self.assertEqual(snapshot['path'], "/movies/dune/.snap/test")
+
+ self._post("/api/cephfs/{}/rm_snapshot".format(fs_id),
+ params={'path': '/movies/dune', 'name': 'test'})
+ self.assertStatus(200)
+
+ data = self._get("/api/cephfs/{}/ls_dir".format(fs_id),
+ params={'path': '/movies'})
+ self.assertStatus(200)
+ self.assertEqual(len(data[0]['snapshots']), 0)
+
+ # Cleanup. Note, the CephFS Python extension (and therefor the Dashoard
+ # REST API) does not support recursive deletion of a directory.
+ self._post("/api/cephfs/{}/rm_dir".format(fs_id),
+ params={'path': '/movies/dune'})
+ self.assertStatus(200)
+ self._post("/api/cephfs/{}/rm_dir".format(fs_id),
+ params={'path': '/movies'})
+ self.assertStatus(200)
from collections import defaultdict
+import os
+
import cherrypy
+import cephfs
from . import ApiController, RESTController, UiApiController
from .. import mgr
from ..exceptions import DashboardException
from ..security import Scope
+from ..services.cephfs import CephFS as CephFS_
from ..services.ceph_service import CephService
from ..tools import ViewCache
CephService.send_command('mds', 'client evict',
srv_spec='{0}:0'.format(fs_id), id=client_id)
+ @staticmethod
+ def _cephfs_instance(fs_id):
+ """
+ :param fs_id: The filesystem identifier.
+ :type fs_id: int | str
+ :return: A instance of the CephFS class.
+ """
+ fs_name = CephFS_.fs_name_from_id(fs_id)
+ if fs_name is None:
+ raise cherrypy.HTTPError(404, "CephFS id {} not found".format(fs_id))
+ return CephFS_(fs_name)
+
+ @RESTController.Resource('GET')
+ def ls_dir(self, fs_id, path=None):
+ """
+ List directories of specified path.
+ :param fs_id: The filesystem identifier.
+ :param path: The path where to start listing the directory content.
+ Defaults to '/' if not set.
+ :return: The names of the directories below the specified path.
+ :rtype: list
+ """
+ if path is None:
+ path = os.sep
+ else:
+ path = os.path.normpath(path)
+ try:
+ cfs = self._cephfs_instance(fs_id)
+ paths = cfs.ls_dir(path, 1)
+ # Convert (bytes => string), prettify paths (strip slashes)
+ # and append additional information.
+ paths = [{
+ 'name': os.path.basename(p.decode()),
+ 'path': p.decode(),
+ 'snapshots': cfs.ls_snapshots(p.decode()),
+ 'quotas': cfs.get_quotas(p.decode())
+ } for p in paths if p != path.encode()]
+ except (cephfs.PermissionError, cephfs.ObjectNotFound):
+ paths = []
+ return paths
+
+ @RESTController.Resource('POST')
+ def mk_dirs(self, fs_id, path):
+ """
+ Create a directory.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory.
+ """
+ cfs = self._cephfs_instance(fs_id)
+ cfs.mk_dirs(path)
+
+ @RESTController.Resource('POST')
+ def rm_dir(self, fs_id, path):
+ """
+ Remove a directory.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory.
+ """
+ cfs = self._cephfs_instance(fs_id)
+ cfs.rm_dir(path)
+
+ @RESTController.Resource('POST')
+ def mk_snapshot(self, fs_id, path, name=None):
+ """
+ Create a snapshot.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory.
+ :param name: The name of the snapshot. If not specified,
+ a name using the current time in RFC3339 UTC format
+ will be generated.
+ :return: The name of the snapshot.
+ :rtype: str
+ """
+ cfs = self._cephfs_instance(fs_id)
+ return cfs.mk_snapshot(path, name)
+
+ @RESTController.Resource('POST')
+ def rm_snapshot(self, fs_id, path, name):
+ """
+ Remove a snapshot.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory.
+ :param name: The name of the snapshot.
+ """
+ cfs = self._cephfs_instance(fs_id)
+ cfs.rm_snapshot(path, name)
+
+ @RESTController.Resource('GET')
+ def get_quotas(self, fs_id, path):
+ """
+ Get the quotas of the specified path.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory/file.
+ :return: Returns a dictionary containing 'max_bytes'
+ and 'max_files'.
+ :rtype: dict
+ """
+ cfs = self._cephfs_instance(fs_id)
+ return cfs.get_quotas(path)
+
+ @RESTController.Resource('POST')
+ def set_quotas(self, fs_id, path, max_bytes, max_files):
+ """
+ Set the quotas of the specified path.
+ :param fs_id: The filesystem identifier.
+ :param path: The path of the directory/file.
+ :param max_bytes: The byte limit.
+ :param max_files: The file limit.
+ """
+ cfs = self._cephfs_instance(fs_id)
+ return cfs.set_quotas(path, max_bytes, max_files)
+
class CephFSClients(object):
def __init__(self, module_inst, fscid):
try:
cfs = CephFS()
root_dir = root_dir.encode()
- paths = cfs.get_dir_list(root_dir, depth)
+ paths = cfs.ls_dir(root_dir, depth)
# Convert (bytes => string) and prettify paths (strip slashes).
paths = [p.decode().rstrip('/') for p in paths if p != root_dir]
except (cephfs.ObjectNotFound, cephfs.PermissionError):
from contextlib import contextmanager
+import datetime
+import os
import six
import cephfs
return [{'id': fs['id'], 'name': fs['mdsmap']['fs_name']}
for fs in fsmap['filesystems']]
+ @classmethod
+ def fs_name_from_id(cls, fs_id):
+ """
+ Get the filesystem name from ID.
+ :param fs_id: The filesystem ID.
+ :type fs_id: int | str
+ :return: The filesystem name or None.
+ :rtype: str | None
+ """
+ fs_map = mgr.get("fs_map")
+ fs_info = list(filter(lambda x: str(x['id']) == str(fs_id),
+ fs_map['filesystems']))
+ if not fs_info:
+ return None
+ return fs_info[0]['mdsmap']['fs_name']
+
def __init__(self, fs_name=None):
logger.debug("[CephFS] initializing cephfs connection")
self.cfs = cephfs.LibCephFS(rados_inst=mgr.rados)
if d:
self.cfs.closedir(d)
- def get_dir_list(self, dirpath, level):
+ def ls_dir(self, path, level):
"""
- :param dirpath: The root directory path.
- :type dirpath: str | bytes
+ List directories of specified path.
+ :param path: The root directory path.
+ :type path: str | bytes
:param level: The number of steps to go down the directory tree.
:type level: int
- :return: A list of directory paths (bytes encoded).
+ :return: A list of directory paths (bytes encoded). The specified
+ root directory is also included.
+ Example:
+ ls_dir('/photos', 1) => [
+ b'/photos', b'/photos/flowers', b'/photos/cars'
+ ]
:rtype: list
"""
- if isinstance(dirpath, six.string_types):
- dirpath = dirpath.encode()
- logger.debug("[CephFS] get_dir_list dirpath=%s level=%s", dirpath,
- level)
+ if isinstance(path, six.string_types):
+ path = path.encode()
+ logger.debug("[CephFS] get_dir_list dir_path=%s level=%s",
+ path, level)
if level == 0:
- return [dirpath]
- logger.debug("[CephFS] opening dirpath=%s", dirpath)
- with self.opendir(dirpath) as d:
+ return [path]
+ logger.debug("[CephFS] opening dir_path=%s", path)
+ with self.opendir(path) as d:
dent = self.cfs.readdir(d)
- paths = [dirpath]
+ paths = [path]
while dent:
logger.debug("[CephFS] found entry=%s", dent.d_name)
if dent.d_name in [b'.', b'..']:
continue
if dent.is_dir():
logger.debug("[CephFS] found dir=%s", dent.d_name)
- subdirpath = b''.join([dirpath, dent.d_name, b'/'])
- paths.extend(self.get_dir_list(subdirpath, level-1))
+ subdir_path = os.path.join(path, dent.d_name)
+ paths.extend(self.ls_dir(subdir_path, level - 1))
dent = self.cfs.readdir(d)
return paths
- def dir_exists(self, dirpath):
+ def dir_exists(self, path):
try:
- with self.opendir(dirpath):
+ with self.opendir(path):
return True
except cephfs.ObjectNotFound:
return False
- def mkdirs(self, dirpath):
- if dirpath == '/':
+ def mk_dirs(self, path):
+ """
+ Create a directory.
+ :param path: The path of the directory.
+ """
+ if path == os.sep:
raise Exception('Cannot create root directory "/"')
- if self.dir_exists(dirpath):
+ if self.dir_exists(path):
+ return
+ logger.info("[CephFS] Creating directory: %s", path)
+ self.cfs.mkdirs(path, 0o755)
+
+ def rm_dir(self, path):
+ """
+ Remove a directory.
+ :param path: The path of the directory.
+ """
+ if path == os.sep:
+ raise Exception('Cannot remove root directory "/"')
+ if not self.dir_exists(path):
return
+ logger.info("[CephFS] Removing directory: %s", path)
+ self.cfs.rmdir(path)
+
+ def mk_snapshot(self, path, name=None, mode=0o755):
+ """
+ Create a snapshot.
+ :param path: The path of the directory.
+ :type path: str
+ :param name: The name of the snapshot. If not specified,
+ a name using the current time in RFC3339 UTC format
+ will be generated.
+ :type name: str | None
+ :param mode: The permissions the directory should have
+ once created.
+ :type mode: int
+ :return: Returns the name of the snapshot.
+ :rtype: str
+ """
+ if name is None:
+ now = datetime.datetime.now()
+ tz = now.astimezone().tzinfo
+ name = now.replace(tzinfo=tz).isoformat('T')
+ client_snapdir = self.cfs.conf_get('client_snapdir')
+ snapshot_path = os.path.join(path, client_snapdir, name)
+ logger.info("[CephFS] Creating snapshot: %s", snapshot_path)
+ self.cfs.mkdir(snapshot_path, mode)
+ return name
- logger.info("[CephFS] Creating directory: %s", dirpath)
- self.cfs.mkdirs("{}".format(dirpath).encode('utf-8'), 0o755)
+ def ls_snapshots(self, path):
+ """
+ List snapshots for the specified path.
+ :param path: The path of the directory.
+ :type path: str
+ :return: A list of dictionaries containing the name and the
+ creation time of the snapshot.
+ :rtype: list
+ """
+ result = []
+ client_snapdir = self.cfs.conf_get('client_snapdir')
+ path = os.path.join(path, client_snapdir).encode()
+ with self.opendir(path) as d:
+ dent = self.cfs.readdir(d)
+ while dent:
+ if dent.is_dir():
+ if dent.d_name not in [b'.', b'..']:
+ snapshot_path = os.path.join(path, dent.d_name)
+ stat = self.cfs.stat(snapshot_path)
+ result.append({
+ 'name': dent.d_name.decode(),
+ 'path': snapshot_path.decode(),
+ 'created': '{}Z'.format(stat.st_ctime.isoformat('T'))
+ })
+ dent = self.cfs.readdir(d)
+ return result
+
+ def rm_snapshot(self, path, name):
+ """
+ Remove a snapshot.
+ :param path: The path of the directory.
+ :type path: str
+ :param name: The name of the snapshot.
+ :type name: str
+ """
+ client_snapdir = self.cfs.conf_get('client_snapdir')
+ snapshot_path = os.path.join(path, client_snapdir, name)
+ logger.info("[CephFS] Removing snapshot: %s", snapshot_path)
+ self.cfs.rmdir(snapshot_path)
+
+ def get_quotas(self, path):
+ """
+ Get the quotas of the specified path.
+ :param path: The path of the directory/file.
+ :type path: str
+ :return: Returns a dictionary containing 'max_bytes'
+ and 'max_files'.
+ :rtype: dict
+ """
+ try:
+ max_bytes = int(self.cfs.getxattr(path, 'ceph.quota.max_bytes'))
+ except cephfs.NoData:
+ max_bytes = 0
+ try:
+ max_files = int(self.cfs.getxattr(path, 'ceph.quota.max_files'))
+ except cephfs.NoData:
+ max_files = 0
+ return {'max_bytes': max_bytes, 'max_files': max_files}
+
+ def set_quotas(self, path, max_bytes=None, max_files=None):
+ """
+ Set the quotas of the specified path.
+ :param path: The path of the directory/file.
+ :type path: str
+ :param max_bytes: The byte limit.
+ :type max_bytes: int | None
+ :param max_files: The file limit.
+ :type max_files: int | None
+ """
+ self.cfs.setxattr(path, 'ceph.quota.max_bytes',
+ str(max_bytes if max_bytes else 0).encode(), 0)
+ self.cfs.setxattr(path, 'ceph.quota.max_files',
+ str(max_files if max_files else 0).encode(), 0)
def create_path(self, path):
cfs = CephFS(self.fs_name)
- if not cfs.dir_exists(path):
- cfs.mkdirs(path)
+ cfs.mk_dirs(path)
@classmethod
def from_fsal_block(cls, fsal_block):