from c_cephfs cimport *
from rados cimport Rados
-from collections import namedtuple
+from collections import namedtuple, deque
from datetime import datetime
import os
import time
+import stat
from typing import Any, Dict, Optional
+from logging import getLogger
+
+
+log = getLogger(__name__)
+
AT_SYMLINK_NOFOLLOW = 0x0100
AT_STATX_SYNC_TYPE = 0x6000
class PermissionDenied(OSError):
pass
+class OpCanceled(OSError):
+ def __init__(self, op_name):
+ '''
+ op_name should be the name of (FS) operation that has been cancelled.
+ '''
+ self.errno = 125 # ECANCELED
+ self.strerror = f'CephFS op {op_name} was cancelled by the user'
+
+ super(OpCanceled, self).__init__(self.errno, self.strerror)
+
+
cdef errno_to_exception = {
EPERM : PermissionError,
ENOENT : ObjectNotFound,
finally:
free(buf)
+
+ def rmtree(self, trash_path, should_cancel, suppress_errors=False):
+ '''
+ Delete entire file hierarchy present under trash_path when trash_path is
+ a dir. Do this deletion using depth-first (to prevent excessive memory
+ consumption) and non-recursive (to prevent hitting Python's max recursion
+ limit error) approach.
+
+ If trash_path is a path to regfile, symlink or something else, delete
+ them and return.
+ '''
+ # st_b = stat buffer
+ st_b = self.stat(trash_path, AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISDIR(st_b.st_mode):
+ NonRecursiveRmtree(self, trash_path, should_cancel,
+ suppress_errors).rmtree()
+ else:
+ try:
+ self.unlink(trash_path)
+ return
+ except Exception as e:
+ log.info('Following exception occurred while unlinking '
+ f'file at path {trash_path}: {e}')
+ raise
+
+
+class NonRecursiveRmtree:
+ '''
+ Contains code to delete entire file tree under a directory with a
+ depth-first, non-recursive approach along with some helper code.
+
+ Primary focus of this class is to traverse the file hierarchy by operating
+ on the stack while using class RmTreeDir for running opendir(), rmdir() and
+ unlink() (in a safe way) and recording failures.
+ '''
+
+ def __init__(self, fs, trash_path, should_cancel, suppress_errors=False):
+ self.fs = fs
+ self.trash_path = trash_path
+
+ self.should_cancel = should_cancel
+ self.suppress_errors = suppress_errors
+
+ # Stack needed for traversing the file heirarchy under trash_path in
+ # depth-first, non-recursive fashion. Each stack member is an instance
+ # of class RmtreeDir.
+ self.stack = deque([])
+
+ # Current directory, dir entries of which are being currently removed.
+ # It should always be the directory at the top of stack, it should
+ # always be an instance of class RmtreeDir.
+ self.curr_dir = None
+
+ def add_dir_to_stack(self, de_name):
+ '''
+ Add new dir to stack and start traversing it. If it fails, add this
+ new dir to current dir's ignorelist since most likely we don't have
+ permissions for it.
+ '''
+ # ensure we are dealing with the dir at the top of the stack.
+ assert self.curr_dir is self.stack[-1]
+
+ de_path = os.path.join(self.curr_dir.path, de_name)
+ try:
+ self.stack.append(RmtreeDir(self.fs, de_path))
+ return True
+ except Error as e:
+ if self.suppress_errors:
+ # add to ignore list, traversal should continue for current dir.
+ log.info(f'dir "{de_name}" couldn\'t be opened and therefore '
+ 'it can\'t be remvoved. perhaps permissions for it '
+ 'are not granted.')
+ self.curr_dir.add_to_de_ignore_list(de_name)
+
+ return False
+ else:
+ raise
+
+ def notify_parent_dir(self):
+ '''
+ Add current dir's name to parent dir's "de_ignore_list". This is
+ necessary since parent dir can't be deleted when current dir can't be
+ deleted.
+ '''
+ # ensure we are dealing with the dir at the top of the stack.
+ assert self.curr_dir is self.stack[-1]
+
+ if len(self.stack) < 2:
+ return
+ parent_dir = self.stack[-2]
+ parent_dir.add_to_de_ignore_list(self.curr_dir.name)
+
+ def rmtree(self):
+ '''
+ This is where depth-first, non-recursive traversal is done.
+ '''
+ try:
+ self.stack.append(RmtreeDir(self.fs, self.trash_path))
+ except Exception as e:
+ log.error('opening root dir of the file tree failed with exception '
+ f'"{e}", exiting.')
+ if self.suppress_errors:
+ return
+ else:
+ raise
+
+ while self.stack:
+ if self.should_cancel():
+ raise OpCanceled('rmtree')
+
+ self.curr_dir = self.stack[-1]
+
+ # de = directory entry
+ de = self.curr_dir.read_dir()
+ while de:
+ if self.should_cancel():
+ raise OpCanceled('rmtree')
+
+ if de.is_dir():
+ try:
+ self.curr_dir.try_rmdir(de.d_name, self.suppress_errors)
+ except ObjectNotEmpty:
+ if self.add_dir_to_stack(de.d_name):
+ # since adding new dir to stack was successful, stop
+ # traversing the current dir and start traversing
+ # the new dir that has been freshly added to the
+ # stack.
+ break
+ else:
+ self.curr_dir.try_unlink(de.d_name, self.suppress_errors)
+
+ de = self.curr_dir.read_dir()
+
+ if self.curr_dir.has_any_fs_op_failed() or self.curr_dir.is_empty:
+ if self.curr_dir.has_any_fs_op_failed():
+ self.notify_parent_dir()
+
+ if self.curr_dir.is_empty:
+ try:
+ self.fs.rmdir(self.curr_dir.path)
+ except ObjectNotEmpty:
+ log.info(f'removing "{self.curr_dir.name}" failed with '
+ 'with ObjectNotEmpty even though dir empty '
+ 'implying it contains a snapshot in its snap'
+ 'dir')
+ self.notify_parent_dir()
+
+ self.stack.pop()
+
+
+class RmtreeDir:
+ '''
+ Holds the path, name and handle of the directory being traversed for
+ rmtree() along with some helper code.
+
+ Primary focus of this class is to run rmtree() and unlink() in a safe way
+ and record failures to prevent hitting them again in future. It serves as
+ helper for class NonRecursiveRmtree.
+ '''
+
+ def __init__(self, fs, path):
+ self.fs = fs
+
+ self.path = path
+ if isinstance(self.path, str):
+ self.path = self.path.encode('utf-8')
+ self.name = os.path.basename(self.path)
+ # XXX: exception (if) raised here should be handled by caller based on
+ # the context.
+ self.handle = self.fs.opendir(self.path)
+
+ # Is this directory empty? It will be set by self.read_dir().
+ self.is_empty = None
+
+ # List of dir entries to be ignored instead of calling rmdir()
+ # or unlink() for them.
+ self.de_ignore_list = []
+
+ # Indicates whether an error occured during call to readdir().
+ self.has_readdir_failed = False
+
+ # If a dir entry has been removed and readdir() returns None,
+ # rewinddir() should be called since POSIX doesn't guarantee
+ # anything regarding behaviour of readdir() when readdir() and
+ # unlink()/rmdir() calls are interleaved. Whenever calls to
+ # unlink()/rmdir() are made, reading dir might've to be
+ # restarted.
+ self.de_has_been_removed = False
+
+ def __str__(self):
+ return self.path
+
+ def add_to_de_ignore_list(self, de_name):
+ self.de_ignore_list.append(de_name)
+
+ def set_readdir_error(self):
+ self.has_readdir_failed = True
+
+ def should_skip_d_name(self, de_name):
+ return de_name in self.de_ignore_list
+
+ def has_any_fs_op_failed(self):
+ return self.has_readdir_failed or len(self.de_ignore_list) > 0
+
+ def read_dir(self):
+ '''
+ Read this dir, return a dentry besides . and .. and ignorelist-ed
+ dentries.
+
+ If a dentry was removed and return value of readdir() is None, rewind
+ the dir and staring read the dir again.
+ '''
+ # Assuming True for now, if it's not empty it will be set to
+ # False by the following loop.
+ self.is_empty = True
+
+ try:
+ de = self.fs.readdir(self.handle)
+ while de:
+ if de.d_name in (b'.', b'..'):
+ pass
+ elif de.d_name in self.de_ignore_list:
+ self.is_empty = False
+ else:
+ self.is_empty = False
+ return de
+
+ de = self.fs.readdir(self.handle)
+ if self.de_has_been_removed:
+ log.debug('rewinding and restarting reading current dir '
+ f'"{self.name}", since a dentry has been '
+ 'removed.')
+ self.handle.rewinddir()
+ # reset de_has_been_removed flag
+ self.de_has_been_removed = False
+
+ de = self.fs.readdir(self.handle)
+ except Error as e:
+ log.error(f'Exception occured: "{e}"')
+ self.set_readdir_error()
+
+ def try_rmdir(self, de_name, suppress_errors=False):
+ '''
+ Remove given directory. If that fails because its not empty, raise the
+ exception, the caller should handle it.
+
+ In case of a failure for some other reason, add it to the ignorelist
+ and tell caller whether to continue or break loop based through the
+ return value.
+ '''
+ de_path = os.path.join(self.path, de_name)
+ try:
+ self.fs.rmdir(de_path)
+ self.de_has_been_removed = True
+ except ObjectNotEmpty:
+ # XXX: push this dir to stack, done in the caller method
+ raise
+ except Error as e:
+ log.error('Following exception occured while calling rmdir() for '
+ f'dir "{self.name}": "{e}"')
+ self.add_to_de_ignore_list(de_name)
+
+ if not suppress_errors:
+ raise
+
+ def try_unlink(self, de_name, suppress_errors=False):
+ '''
+ Unlink given file and add it to the ignore list if that fails.
+ '''
+ de_path = os.path.join(self.path, de_name)
+ try:
+ self.fs.unlink(de_path)
+ self.de_has_been_removed = True
+ except Error as e:
+ log.error('Following exception occured while calling unlink() for '
+ f'file "{de_name}": "{e}"')
+ self.add_to_de_ignore_list(de_name)
+
+ if not suppress_errors:
+ raise