import errno
import logging
from contextlib import contextmanager
-from typing import Dict, Union
+from typing import Optional
import cephfs
raise VolumeException(-errno.EINTR, "clone operation interrupted")
def set_quota_on_clone(fs_handle, clone_volumes_pair):
- attrs = {} # type: Dict[str, Union[int, str, None]]
src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2])
dst_path = clone_volumes_pair[0].path
+ quota = None # type: Optional[int]
try:
- attrs["quota"] = int(fs_handle.getxattr(src_path,
- 'ceph.quota.max_bytes'
- ).decode('utf-8'))
+ quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8'))
except cephfs.NoData:
- attrs["quota"] = None
+ pass
- if attrs["quota"] is not None:
- clone_volumes_pair[0].set_attrs(dst_path, attrs)
+ if quota is not None:
+ try:
+ fs_handle.setxattr(dst_path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel):
with open_volume_lockless(fs_client, volname) as fs_handle: