$ ceph fs subvolume snapshot ls <vol_name> <subvol_name> [--group_name <subvol_group_name>]
+Fetch the metadata of a snapshot using::
+
+ $ ceph fs subvolume snapshot info <vol_name> <subvol_name> <snap_name> [--group_name <subvol_group_name>]
+
+The output format is json and contains fields as follows.
+
+* created_at: time of creation of snapshot in the format "YYYY-MM-DD HH:MM:SS:ffffff"
+* data_pool: data pool the snapshot belongs to
+* has_pending_clones: "yes" if snapshot clone is in progress otherwise "no"
+* protected: "yes" if snapshot is protected otherwise "no"
+* size: snapshot size in bytes
+
Cloning Snapshots
-----------------
subvol_md = self._fs_cmd(*args)
return subvol_md
+ def _get_subvolume_snapshot_info(self, vol_name, subvol_name, snapname, group_name=None):
+ args = ["subvolume", "snapshot", "info", vol_name, subvol_name, snapname]
+ if group_name:
+ args.append(group_name)
+ args = tuple(args)
+ snap_md = self._fs_cmd(*args)
+ return snap_md
+
def _delete_test_volume(self):
self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
# verify trash dir is clean
self._wait_for_trash_empty()
+ def test_subvolume_snapshot_info(self):
+
+ """
+ tests the 'fs subvolume snapshot info' command
+ """
+
+ snap_metadata = ["created_at", "data_pool", "has_pending_clones", "protected", "size"]
+
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=1)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # now, protect snapshot
+ self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+ self.assertNotEqual(len(snap_info), 0)
+ for md in snap_metadata:
+ if md not in snap_info:
+ raise RuntimeError("%s not present in the metadata of subvolume snapshot" % md)
+ self.assertEqual(snap_info["protected"], "yes")
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # now, unprotect snapshot
+ self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
def test_subvolume_snapshot_create_idempotence(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
import uuid
import errno
import logging
+from datetime import datetime
import cephfs
snappath = self.snapshot_path(snapname)
rmsnap(self.fs, snappath)
+ def snapshot_info(self, snapname):
+ snappath = self.snapshot_path(snapname)
+ snap_info = {}
+ try:
+ snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
+ 'data_pool':'ceph.dir.layout.pool'}
+ for key, val in snap_attrs.items():
+ snap_info[key] = self.fs.getxattr(snappath, val)
+ return {'size': int(snap_info['size']),
+ 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+ 'data_pool': snap_info['data_pool'].decode('utf-8'),
+ 'protected': "yes" if self.is_snapshot_protected(snapname) else "no",
+ 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT,
+ "snapshot '{0}' doesnot exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
def list_snapshots(self):
try:
dirpath = os.path.join(self.path,
ret = self.volume_exception_to_retval(ve)
return ret
+ def subvolume_snapshot_info(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
+ snap_info_dict = subvolume.snapshot_info(snapname)
+ ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
def list_subvolume_snapshots(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
"and optionally, in a specific subvolume group",
'perm': 'rw'
},
+ {
+ 'cmd': 'fs subvolume snapshot info '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=snap_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Get the metadata of a CephFS subvolume snapshot "
+ "and optionally, in a specific subvolume group",
+ 'perm': 'r'
+ },
{
'cmd': 'fs subvolume snapshot rm '
'name=vol_name,type=CephString '
group_name=cmd.get('group_name', None),
force=cmd.get('force', False))
+ def _cmd_fs_subvolume_snapshot_info(self, inbuf, cmd):
+ return self.vc.subvolume_snapshot_info(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ snap_name=cmd['snap_name'],
+ group_name=cmd.get('group_name', None))
+
def _cmd_fs_subvolume_snapshot_ls(self, inbuf, cmd):
return self.vc.list_subvolume_snapshots(vol_name=cmd['vol_name'],
sub_name=cmd['sub_name'],