AT_STATX_SYNC_AS_STAT = 0x0000
AT_STATX_FORCE_SYNC = 0x2000
AT_STATX_DONT_SYNC = 0x4000
+AT_FDCWD = -100
cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
CEPH_STATX_BASIC_STATS = 0x7ff
cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
f'file at path {trash_path}: {e}')
raise
+ def cptree(self, src_path, dst_path, should_sync_attrs=False,
+ cp_src_dir=True, should_cancel=False, suppress_errors=False):
+ '''
+ Copy entire file hierarchy under src using depth-first (to prevent
+ excessive memory consumption) and non-recursive (to prevent hitting
+ Python's max recursion limit error) approach.
+
+ If src is regfile, symlink or something else, copy it to dst and return.
+ '''
+ if isinstance(src_path, str):
+ src_path = src_path.encode('utf-8')
+ else:
+ assert isinstance(src_path, bytes)
+
+ if isinstance(dst_path, str):
+ dst_path = dst_path.encode('utf-8')
+ else:
+ assert isinstance(dst_path, bytes)
+
+ if not should_cancel:
+ should_cancel = lambda: False
+
+ # stx_b = statx buffer
+ stx_b = self.statx(src_path, CEPH_STATX_MODE, AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISDIR(stx_b['mode']):
+ cptree_worker = CptreeWorker(
+ self, src_path, dst_path, should_sync_attrs, cp_src_dir,
+ should_cancel, suppress_errors)
+
+ cptree_worker.start()
+ elif stat.S_ISREG(stx_b['mode']):
+ src_dir = os.path.dirname(src_path)
+ src_file_name = os.path.basename(src_path)
+
+ src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+ copy_reg_file(self, src_fd, dst_fd, src_file_name, should_sync_attrs)
+ elif stat.S_ISLNK(stx_b['mode']):
+ src_dir = os.path.dirname(src_path)
+ src_link_name = os.path.basename(src_path)
+
+ src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+ dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+ copy_sym_link(self, src_fd, dst_fd, src_link_name, should_sync_attrs)
+ else:
+ raise RuntimeError('expected a directory, regfile or symlink but '
+ f'found something else. src = {self.src_path}')
+
class UnlinkTreeWorker:
'''
if not suppress_errors:
raise
+
+
+# following code includes cptree() and related helper methods.
+
+class CptreeWorker:
+ '''
+ Contains code to non-recursively copy a file hierarchy present under a
+ given path with a depth-first approach. And while doing so, handle
+ case where a dir can't be copied due to permission issue.
+ '''
+
+ def __init__(self, fs, src_path, dst_path, should_sync_attrs=False,
+ cp_src_dir=False, should_cancel=lambda: False,
+ suppress_errors=False):
+ self.fs = fs
+
+ # source and destination path passed by the user.
+ self.src_path = src_path
+ self.dst_path = dst_path
+ self._do_sanity_check_for_paths()
+
+ # in case of subvolume snap clone, src's UUID dir is not copied, only
+ # its contents are copied to dst's UUID dir. This param is meant for
+ # indicating this.
+ self.cp_src_dir = cp_src_dir
+
+ # set attrs on dst files to same as src files.
+ self.should_sync_attrs = should_sync_attrs
+
+ self.should_cancel = should_cancel
+ self.suppress_errors = suppress_errors
+
+ # Stack needed for traversing/copying file hierarchy in non-recursive,
+ # depth-first fashion.
+ self.stack = deque([])
+
+ # Current dir that is being copied. It should always be topmost entry of
+ # stack.
+ self.curr_dir = None
+
+ def _do_sanity_check_for_paths(self):
+ if self.src_path == b'/':
+ raise PermissionError(1, 'can\'t copy dir into itself')
+
+ # check if self.src_path is ancestor of self.dst_path.
+ src_path_comp = self.src_path + b'/'
+ if src_path_comp in self.dst_path:
+ raise PermissionError(1, 'can\'t copy dir into itself')
+
+ def notify_parent_dir(self):
+ '''
+ Add current dir's name to parent dir's "de_ignore_list". This is
+ necessary since parent dir can't be copied when current dir can't be
+ copied.
+ '''
+ # ensure we are dealing with the dir at the top of the stack.
+ assert self.curr_dir is self.stack[-1]
+
+ if len(self.stack) < 2:
+ return
+
+ parent_dir = self.stack[-2]
+ parent_dir.add_to_de_ignore_list(self.curr_dir.name)
+
+ def add_dir_to_stack(self, de_name):
+ '''
+ Add new dir to stack and start traversing it. If it fails, add this
+ new dir to current dir's ignorelist since most likely we don't have
+ permissions for it.
+ '''
+ # ensure we are dealing with the dir at the top of the stack.
+ assert self.curr_dir is self.stack[-1]
+
+ try:
+ self.stack.append(CptreeDir(self.fs, de_name, de_name,
+ self.curr_dir,
+ should_sync_attrs=self.should_sync_attrs))
+ return True
+ except Error as e:
+ if self.suppress_errors:
+ # add to ignore list, traversal should continue for current dir.
+ log.info(f'dir "{de_name}" couldn\'t be opened and therefore '
+ 'it can\'t be removed. perhaps permissions for it '
+ 'are not granted.')
+ self.curr_dir.add_to_de_ignore_list(de_name)
+
+ return False
+ else:
+ raise
+
+ def start(self):
+ # initiate stack with first entry
+ try:
+ if self.cp_src_dir:
+ src_dir_name = os.path.basename(self.src_path)
+ dst_path = os.path.join(self.dst_path, src_dir_name)
+ else:
+ dst_path = self.dst_path
+ self.stack.append(CptreeDir(self.fs, self.src_path, dst_path,
+ parent_dir=None,
+ should_sync_attrs=self.should_sync_attrs,
+ cp_src_dir=self.cp_src_dir))
+ except Exception as e:
+ log.error('opening root dir of the file tree failed with exception '
+ f'"{e}", exiting.')
+ if self.suppress_errors:
+ return
+ else:
+ raise
+
+ while self.stack:
+ if self.should_cancel():
+ raise OpCanceled('cptree')
+
+ self.curr_dir = self.stack[-1]
+ finished_copying_curr_dir = True
+
+ # de = directory entry
+ de = self.curr_dir.read_src_dir()
+ while de:
+ if self.should_cancel():
+ raise OpCanceled('cptree')
+
+ if de.is_dir():
+ if self.add_dir_to_stack(de.d_name):
+ # since adding new dir to stack was successful, stop
+ # traversing current dir and start traversing the new
+ # dir that has been freshly added to the stack.
+ finished_copying_curr_dir = False
+ break
+ elif de.is_symbol_file():
+ self.curr_dir.copy_sym_link(de.d_name)
+ else:
+ self.curr_dir.copy_reg_file(de.d_name)
+
+ de = self.curr_dir.read_src_dir()
+
+ if finished_copying_curr_dir:
+ # XXX if attrs are sync-ed during creation of dir on destination
+ # side, it's mtime would change as files are copied underneath
+ # it. to avoid this, first copy all files and then sync attrs.
+ self.curr_dir.sync_attrs()
+
+ if self.curr_dir.has_any_fs_op_failed():
+ self.notify_parent_dir()
+ self.stack.pop()
+
+
+class CptreeDir:
+ '''
+ Contains code for reading and copying dir entries and for handling cases
+ when it can't be done due to lack of permission.
+
+ Although named CptreeDir, this class's objects operates over a pair of dir
+ -- src dir as well as dst dir.
+ '''
+
+ def __init__(self, fs, src_rel_path, dst_rel_path, parent_dir=None,
+ should_sync_attrs=False, cp_src_dir=True):
+ self.fs = fs
+
+ self.src_rel_path = src_rel_path
+ self.dst_rel_path = dst_rel_path
+
+ self.should_sync_attrs = should_sync_attrs
+ self.cp_src_dir = cp_src_dir
+
+ # needed to open src dir fd and dst dir fd.
+ self.parent_dir_src_fd = parent_dir.src_fd if parent_dir else AT_FDCWD
+ self.parent_dir_dst_fd = parent_dir.dst_fd if parent_dir else AT_FDCWD
+
+ self.copy_dir()
+
+ self.src_fd = self.fs.openat(self.parent_dir_src_fd, self.src_rel_path,
+ os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+ self.dst_fd = self.fs.openat(self.parent_dir_dst_fd, self.dst_rel_path,
+ os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+ self.src_handle = self.fs.fdopendir(self.src_fd)
+
+ # List of dir entries to be ignored instead of copying them.
+ self.de_ignore_list = []
+
+ # Indicates whether an error occured during call to readdir().
+ self.has_readdir_failed = False
+
+ def __str__(self):
+ return f'{self.src_rel_path}, {self.dst_rel_path}'
+
+ def add_to_de_ignore_list(self, de_name):
+ self.de_ignore_list.append(de_name)
+
+ def set_readdir_error(self):
+ self.has_readdir_failed = True
+
+ def should_skip_d_name(self, de_name):
+ return de_name in self.de_ignore_list
+
+ def has_any_fs_op_failed(self):
+ return self.has_readdir_failed or len(self.de_ignore_list) > 0
+
+ def read_src_dir(self):
+ '''
+ Get sub-directory of current directory.
+ '''
+ # Assuming True for now, if it's not empty it will be set to
+ # False by the following loop.
+ self.is_empty = True
+
+ try:
+ # de = directory entry
+ de = self.fs.readdir(self.src_handle)
+ while de:
+ if de.d_name in (b'.', b'..'):
+ pass
+ elif de.d_name in self.de_ignore_list:
+ self.is_empty = False
+ log.debug(
+ 'readdir() has previously '
+ f'failed for dir entry "{de.d_name}", avoiding '
+ 'running readdir() on it again.')
+ else:
+ self.is_empty = False
+ return de
+
+ de = self.fs.readdir(self.src_handle)
+ except Error as e:
+ # This is the tricky one: it's an error on this
+ # directory, not on a entry in this directory
+ log.error(f'Exception occured: "{e}"')
+ self.set_readdir_error()
+
+ def copy_dir(self):
+ # XXX: cp_src_dir=False implies don't copy dir, copy only its contents
+ # instead. This is used for subvolume cloning.
+ if not self.cp_src_dir:
+ return
+
+ # create dir on dst side.
+ self.fs.mkdirat(self.parent_dir_dst_fd, self.dst_rel_path, 0o755)
+
+ # XXX: Dir's mtime gets updated when a file is copied underneath it.
+ # Therefore, call this method only after all files have been copied
+ # underneath the dir on dst side so that mtime on it and mtime on src
+ # dir is the same.
+ def sync_attrs(self):
+ '''
+ Copy value of attributes from source side to the directory on
+ destination side.
+ '''
+ if self.should_sync_attrs:
+ sync_attrs(self.fs, self.parent_dir_src_fd, self.parent_dir_dst_fd,
+ self.dst_rel_path)
+
+ def copy_reg_file(self, de_name):
+ copy_reg_file(self.fs, self.src_fd, self.dst_fd, de_name,
+ self.should_sync_attrs)
+
+ def copy_sym_link(self, de_name):
+ copy_sym_link(self.fs, self.src_fd, self.dst_fd, de_name,
+ self.should_sync_attrs)
+
+
+def sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b=None):
+ if not src_stx_b:
+ flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE |
+ CEPH_STATX_ATIME | CEPH_STATX_MTIME)
+ # src_stx_b = statx buffer for source path
+ src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+
+ src_uid = src_stx_b["uid"]
+ src_gid = src_stx_b["gid"]
+ src_mode = src_stx_b["mode"]
+ src_timestamps = (time.mktime(src_stx_b["atime"].timetuple()),
+ time.mktime(src_stx_b["mtime"].timetuple()))
+
+ try:
+ fs.chownat(dst_fd, de_name, src_uid, src_gid, AT_SYMLINK_NOFOLLOW)
+ fs.chmodat(dst_fd, de_name, src_mode, AT_SYMLINK_NOFOLLOW)
+ fs.utimensat(dst_fd, de_name, src_timestamps, AT_SYMLINK_NOFOLLOW)
+ except Exception as e:
+ log.error('Exception occurred while synchronizing attrs for '
+ f'"{de_name}". Exception: ({e})')
+ raise e
+
+
+def copy_reg_file(fs, src_fd, dst_fd, file_name, should_sync_attrs=False):
+ src_file_fd = dst_file_fd = None
+ try:
+ src_file_fd = fs.openat(src_fd, file_name, os.O_RDONLY, 0o755)
+ dst_file_fd = fs.openat(dst_fd, file_name, os.O_CREAT | os.O_TRUNC |
+ os.O_WRONLY, 0o755)
+ except Exception:
+ if src_file_fd:
+ fs.close(src_file_fd)
+ if dst_file_fd:
+ fs.close(dst_file_fd)
+ raise
+
+ while True:
+ data = fs.read(src_file_fd, -1, 1 * 1024 * 1024)
+ if not len(data):
+ break
+
+ written = 0
+ while written < len(data):
+ written += fs.write(dst_file_fd, data[written:], -1)
+
+ if should_sync_attrs:
+ sync_attrs(fs, src_fd, dst_fd, file_name)
+
+ fs.fsync(dst_file_fd, 0)
+ fs.close(src_file_fd)
+ fs.close(dst_file_fd)
+
+
+def copy_sym_link(fs, src_fd, dst_fd, de_name, should_sync_attrs=False):
+ flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE |
+ CEPH_STATX_ATIME | CEPH_STATX_MTIME | CEPH_STATX_SIZE)
+ # src_stx_b = statx buffer for source path
+ src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+ size = src_stx_b['size']
+
+ src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+ try:
+ target = fs.readlinkat(src_fd, de_name, size)
+ except Exception as e:
+ log.info('Following exception occurred while reading '
+ f'symlink: {e}')
+ raise
+
+ try:
+ fs.symlinkat(target, dst_fd, de_name)
+ except Exception as e:
+ log.info('Following exception occurred while creating '
+ f'symlink: {e}')
+ raise
+
+ if should_sync_attrs:
+ sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b)