cephfs = None
+
def setup_cephfs(config_file):
"""
Mouting a cephfs
"""
global cephfs
- cephfs = libcephfs.LibCephFS(conffile = config_file )
+ cephfs = libcephfs.LibCephFS(conffile=config_file)
cephfs.mount()
+
def mode_notation(mode):
"""
"""
- permission_bits = {'0':'---', '1':'--x', '2':'-w-', '3': '-wx', '4':'r--', '5':'r-x', '6':'rw-', '7':'rwx'}
+ permission_bits = {'0': '---', '1': '--x', '2': '-w-',
+ '3': '-wx', '4': 'r--', '5': 'r-x', '6': 'rw-', '7': 'rwx'}
mode = str(oct(mode))
notation = '-'
if mode[2] == '4':
notation += permission_bits[i]
return notation
+
def get_chunks(file_size):
chunk_start = 0
chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size
final_chunk_size = file_size - chunk_start
yield(chunk_start, final_chunk_size)
+
def to_bytes(string):
- return bytes(string, encoding = 'utf-8')
+ return bytes(string, encoding='utf-8')
-def list_items(dir_name = ''):
+
+def list_items(dir_name=''):
if not isinstance(dir_name, bytes):
dir_name = to_bytes(dir_name)
if dir_name == '':
cephfs.closedir(d)
return items
+
def glob(dir_name, pattern):
if isinstance(dir_name, bytes):
dir_name = dir_name.decode('utf-8')
if dir_name == '/' or is_dir_exists(dir_name.rsplit('/', 1)[1], parent_dir):
for i in list_items(dir_name)[2:]:
if fnmatch.fnmatch(i.d_name.decode('utf-8'), pattern):
- paths.append(re.sub('\/+', '/', dir_name + '/' + i.d_name.decode('utf-8')))
+ paths.append(re.sub('\/+', '/', dir_name +
+ '/' + i.d_name.decode('utf-8')))
return paths
+
def get_all_possible_paths(pattern):
paths = []
dir_ = cephfs.getcwd()
is_rel_path = False
patterns = pattern.split('/')
paths.extend(glob(dir_, patterns[0]))
- patterns.pop(0)
+ patterns.pop(0)
for pattern in patterns:
for path in paths:
paths.extend(glob(path, pattern))
return [path for path in paths if fnmatch.fnmatch(path, '/'*is_rel_path + pattern)]
+
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
+
+
def humansize(nbytes):
i = 0
while nbytes >= 1024 and i < len(suffixes)-1:
f = ('%d' % nbytes).rstrip('.')
return '%s%s' % (f, suffixes[i])
+
def print_long(shell, file_name, flag, human_readable):
if not isinstance(file_name, bytes):
- file_name = to_bytes(file_name)
+ file_name = to_bytes(file_name)
info = cephfs.stat(file_name)
if flag:
- file_name = colorama.Fore.BLUE + file_name.decode('utf-8').rsplit('/', 1)[1] + '/'+ colorama.Style.RESET_ALL
+ file_name = colorama.Fore.BLUE + \
+ file_name.decode('utf-8').rsplit('/',
+ 1)[1] + '/' + colorama.Style.RESET_ALL
else:
- file_name = file_name.decode('utf-8').rsplit('/', 1)[1]
+ file_name = file_name.decode('utf-8').rsplit('/', 1)[1]
if human_readable:
- shell.poutput('{}\t{:10s} {} {} {} {}'.format(mode_notation(info.st_mode), humansize(info.st_size), info.st_uid, info.st_gid, info.st_mtime, file_name, sep = '\t'))
+ shell.poutput('{}\t{:10s} {} {} {} {}'.format(mode_notation(info.st_mode), humansize(
+ info.st_size), info.st_uid, info.st_gid, info.st_mtime, file_name, sep='\t'))
else:
- shell.poutput('{} {:12d} {} {} {} {}'.format(mode_notation(info.st_mode), info.st_size, info.st_uid, info.st_gid, info.st_mtime, file_name, sep = '\t'))
+ shell.poutput('{} {:12d} {} {} {} {}'.format(mode_notation(
+ info.st_mode), info.st_size, info.st_uid, info.st_gid, info.st_mtime, file_name, sep='\t'))
+
def word_len(word):
"""
Returns the word lenght, minus any color codes.
"""
if word[0] == '\x1b':
- return len(word) - 9
+ return len(word) - 9
return len(word)
-def is_dir_exists(dir_name, dir_ = ''):
- if dir_ == '':
+
+def is_dir_exists(dir_name, dir_=''):
+ if dir_ == '':
dir_ = cephfs.getcwd()
elif not isinstance(dir_, bytes):
dir_ = to_bytes(dir_)
dir_name = to_bytes(dir_name)
return len([i for i in set(list_items(dir_)) if i.d_name == dir_name and i.is_dir()]) > 0
-def is_file_exists(file_name, dir_ = ''):
+
+def is_file_exists(file_name, dir_=''):
if dir_ == '':
dir_ = cephfs.getcwd()
elif not isinstance(dir_, bytes):
dir_ = to_bytes(dir_)
if not isinstance(file_name, bytes):
if file_name.count('/') > 0:
- file_name = to_bytes(file_name.rsplit('/',1)[1])
+ file_name = to_bytes(file_name.rsplit('/', 1)[1])
else:
file_name = to_bytes(file_name)
return len([i for i in set(list_items(dir_)) if i.d_name == file_name and not i.is_dir()]) > 0
-def print_list(shell, words, termwidth = 79):
+
+def print_list(shell, words, termwidth=79):
if not words:
return
width = max([word_len(word) for word in words]) + 2
for i in range(row, nwords, nrows):
word = words[i]
if word[0] == '\x1b':
- shell.poutput('%-*s'% (width + 9, words[i]), end = '\n'if i + nrows >= nwords else '')
+ shell.poutput(
+ '%-*s' % (width + 9, words[i]), end='\n'if i + nrows >= nwords else '')
else:
- shell.poutput('%-*s'% (width, words[i]), end = '\n'if i + nrows >= nwords else '')
+ shell.poutput(
+ '%-*s' % (width, words[i]), end='\n'if i + nrows >= nwords else '')
+
def copy_from_local(shell, local_path, remote_path):
stdin = -1
stdin = 1
file_size = os.path.getsize(local_path)
# return
- fd = cephfs.open(to_bytes(remote_path), 'w', 0o666)
+ fd = cephfs.open(to_bytes(remote_path), 'w', 0o666)
if file_size == 0:
- return
+ return
progress = 0
while True:
data = file_.read(65536)
if not data:
break
- wrote = cephfs.write(fd, data, progress)
+ wrote = cephfs.write(fd, data, progress)
if wrote < 0:
break
progress += wrote
cephfs.close(fd)
- if stdin > 0:
+ if stdin > 0:
file_.close()
shell.poutput('')
+
def copy_to_local(shell, remote_path, local_path):
if not os.path.exists(local_path.rsplit('/', 1)[0]):
- os.makedirs(local_path.rsplit('/', 1)[0], exist_ok = True)
+ os.makedirs(local_path.rsplit('/', 1)[0], exist_ok=True)
fd = None
if len(remote_path.rsplit('/', 1)) > 2 and remote_path.rsplit('/', 1)[1] == '':
return
if fd:
fd.write(file_chunk)
else:
- shell.poutput(file_chunk.decode('utf-8'))
+ shell.poutput(file_chunk.decode('utf-8'))
cephfs.close(file_)
- if fd:
- fd.close()
+ if fd:
+ fd.close()
+
def dirwalk(dir_name, giveDirs=0):
"""
"""
dir_name = re.sub('\/+', '/', dir_name)
for item in list_items(dir_name)[2:]:
- fullpath = dir_name + '/'+ item.d_name.decode('utf-8')
+ fullpath = dir_name + '/' + item.d_name.decode('utf-8')
yield fullpath.rsplit('/', 1)[0] + '/'
if is_dir_exists(item.d_name, fullpath.rsplit('/', 1)[0]):
if not len(list_items(fullpath)[2:]):
- yield re.sub('\/+', '/',fullpath)
+ yield re.sub('\/+', '/', fullpath)
else:
- for x in dirwalk(fullpath):
+ for x in dirwalk(fullpath):
if giveDirs:
- yield x
+ yield x
else:
- yield x
+ yield x
else:
- yield re.sub('\/+', '/',fullpath)
+ yield re.sub('\/+', '/', fullpath)
+
class CephFSShell(Cmd):
self.intro = 'Ceph File System Shell'
self.interactive = False
self.umask = '2'
-
+
def default(self, line):
self.poutput('Unrecognized command:', line)
-
+
def set_prompt(self):
- self.prompt = '\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX + self.working_dir + colorama.Style.RESET_ALL + '\033[01;33m>>>\033[00m '
+ self.prompt = '\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX + \
+ self.working_dir + colorama.Style.RESET_ALL + \
+ '\033[01;33m>>>\033[00m '
def create_argparser(self, command):
try:
- argparse_args = getattr(self, 'argparse_'+ command)
+ argparse_args = getattr(self, 'argparse_' + command)
except AttributeError:
return None
- doc_lines = getattr(self, 'do_'+ command).__doc__.expandtabs().splitlines()
+ doc_lines = getattr(
+ self, 'do_' + command).__doc__.expandtabs().splitlines()
if ''in doc_lines:
blank_idx = doc_lines.index('')
usage = doc_lines[:blank_idx]
usage = doc_lines
description = []
parser = argparse.ArgumentParser(
- prog = command,
- usage = '\n'.join(usage),
- description = '\n'.join(description),
- formatter_class = argparse.ArgumentDefaultsHelpFormatter
+ prog=command,
+ usage='\n'.join(usage),
+ description='\n'.join(description),
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
for args, kwargs in argparse_args:
parser.add_argument(*args, **kwargs)
def complete_filenames(self, text, line, begidx, endidx):
if not text:
- completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir()) for x in list_items(cephfs.getcwd())[2:]]
+ completions = [x.d_name.decode(
+ 'utf-8') + '/' * int(x.is_dir()) for x in list_items(cephfs.getcwd())[2:]]
else:
if text.count('/') > 0:
- completions = [text.rsplit('/', 1)[0] + '/' + x.d_name.decode('utf-8') + '/'*int(x.is_dir()) for x in list_items('/'+ text.rsplit('/', 1)[0])[2:] if x.d_name.decode('utf-8').startswith(text.rsplit('/', 1)[1])]
+ completions = [text.rsplit('/', 1)[0] + '/' + x.d_name.decode('utf-8') + '/'*int(x.is_dir()) for x in list_items(
+ '/' + text.rsplit('/', 1)[0])[2:] if x.d_name.decode('utf-8').startswith(text.rsplit('/', 1)[1])]
else:
- completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir()) for x in list_items()[2:] if x.d_name.decode('utf-8').startswith(text)]
+ completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir()) for x in list_items()[
+ 2:] if x.d_name.decode('utf-8').startswith(text)]
if len(completions) == 1 and completions[0][-1] == '/':
dir_, file_ = completions[0].rsplit('/', 1)
- completions.extend([dir_ + '/' + x.d_name.decode('utf-8') + '/' * int(x.is_dir()) for x in list_items('/'+ dir_)[2:] if x.d_name.decode('utf-8').startswith(file_)])
+ completions.extend([dir_ + '/' + x.d_name.decode('utf-8') + '/' * int(x.is_dir())
+ for x in list_items('/' + dir_)[2:] if x.d_name.decode('utf-8').startswith(file_)])
return self.delimiter_complete(text, line, begidx, endidx, completions, '/')
return completions
-
+
def onecmd(self, line):
"""
Global error catcher
"""
try:
res = Cmd.onecmd(self, line)
- if self.interactive:
+ if self.interactive:
self.set_prompt()
return res
except ConnectionError as e:
self.poutput('Command aborted')
except Exception as e:
self.poutput(e)
- traceback.print_exc(file = sys.stdout)
+ traceback.print_exc(file=sys.stdout)
def complete_mkdir(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- mkdir_parser = argparse.ArgumentParser(description = 'Create the directory(ies), if they do not already exist.')
- mkdir_parser.add_argument('dirs', type = str, metavar = 'DIR_NAME', help = 'Name of new_directory.', nargs = '+')
- mkdir_parser.add_argument('-m', '--mode', action = 'store', help = 'Sets the access mode for the new directory.', type = str)
- mkdir_parser.add_argument('-p', '--parent', action = 'store_true', help = 'Create parent directories as necessary. When this option is specified, no error is reported if a directory already exists.')
-
+ mkdir_parser = argparse.ArgumentParser(
+ description='Create the directory(ies), if they do not already exist.')
+ mkdir_parser.add_argument(
+ 'dirs', type=str, metavar='DIR_NAME', help='Name of new_directory.', nargs='+')
+ mkdir_parser.add_argument('-m', '--mode', action='store',
+ help='Sets the access mode for the new directory.', type=str)
+ mkdir_parser.add_argument('-p', '--parent', action='store_true',
+ help='Create parent directories as necessary. When this option is specified, no error is reported if a directory already exists.')
+
@with_argparser(mkdir_parser)
def do_mkdir(self, args):
"""
Create directory.
- """
+ """
for dir_name in args.dirs:
path = to_bytes('/' + dir_name)
if args.mode:
def complete_put(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
index_dict = {1: self.path_complete}
return self.index_based_complete(text, line, begidx, endidx, index_dict)
- put_parser = argparse.ArgumentParser(description = 'Copy a file/directory to Ceph File System from Local File System.')
- put_parser.add_argument('local_path', type = str, help = 'Path of the file in the local system')
- put_parser.add_argument('remote_path', type = str, help = 'Path of the file in the remote system.', nargs = '?', default = '.')
- put_parser.add_argument('-f', '--force', action = 'store_true', help = 'Overwrites the destination if it already exists.')
+ put_parser = argparse.ArgumentParser(
+ description='Copy a file/directory to Ceph File System from Local File System.')
+ put_parser.add_argument('local_path', type=str,
+ help='Path of the file in the local system')
+ put_parser.add_argument(
+ 'remote_path', type=str, help='Path of the file in the remote system.', nargs='?', default='.')
+ put_parser.add_argument('-f', '--force', action='store_true',
+ help='Overwrites the destination if it already exists.')
@with_argparser(put_parser)
def do_put(self, args):
"""
Copy a file to Ceph File System from Local Directory.
"""
- root_src_dir = args.local_path
- root_dst_dir = args.remote_path
+ root_src_dir = args.local_path
+ root_dst_dir = args.remote_path
if args.local_path == '.':
root_src_dir = os.getcwd()
if root_dst_dir == '.' and (root_src_dir.count('/') > 0 and root_src_dir[-1] != '/'):
- root_dst_dir = root_src_dir.rsplit('/',1)[1]
+ root_dst_dir = root_src_dir.rsplit('/', 1)[1]
elif args.remote_path == '.':
- root_dst_dir = cephfs.getcwd().decode('utf-8')
+ root_dst_dir = cephfs.getcwd().decode('utf-8')
elif root_dst_dir[-1] != '/':
root_dst_dir += '/'
if args.local_path == '-' or os.path.isfile(root_src_dir):
cephfs.mkdirs(to_bytes(dst_dir), 0o777)
for file_ in files:
src_file = os.path.join(src_dir, file_)
- dst_file = re.sub('\/+', '/', '/'+ dst_dir + '/'+ file_)
+ dst_file = re.sub('\/+', '/', '/' + dst_dir + '/' + file_)
if (not args.force) and is_file_exists(re.sub('\/+', '/', dst_file)):
return
- copy_from_local(self, src_file, re.sub('\/+', '/', dst_file))
-
+ copy_from_local(
+ self, src_file, re.sub('\/+', '/', dst_file))
def complete_get(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- get_parser = argparse.ArgumentParser(description = 'Copy a file from Ceph File System from Local Directory.')
- get_parser.add_argument('remote_path', type = str, help = 'Path of the file in the remote system')
- get_parser.add_argument('local_path', type = str, help = 'Path of the file in the local system', nargs = '?', default = '.')
+ get_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System from Local Directory.')
+ get_parser.add_argument('remote_path', type=str,
+ help='Path of the file in the remote system')
+ get_parser.add_argument(
+ 'local_path', type=str, help='Path of the file in the local system', nargs='?', default='.')
@with_argparser(get_parser)
def do_get(self, args):
Copy a file/directory from Ceph File System to Local Directory.
"""
root_src_dir = args.remote_path
- root_dst_dir = args.local_path
+ root_dst_dir = args.local_path
if args.local_path == '.':
root_dst_dir = os.getcwd()
if args.remote_path == '.':
root_src_dir = cephfs.getcwd().decode('utf-8')
if args.local_path == '-':
copy_to_local(self, root_src_dir, '-')
- elif is_file_exists(args.remote_path):# any([i for i in list_items() if i.d_name.decode('utf-8') == args.remote_path and not i.is_dir()]):
- copy_to_local(self, root_src_dir, root_dst_dir + '/'+ root_src_dir)
- elif '/'in root_src_dir and is_file_exists(root_src_dir.rsplit('/', 1)[1], root_src_dir.rsplit('/', 1)[0]): #any([i for i in list_items() if i.d_name.decode('utf-8') == and not i.is_dir()]):
+ # any([i for i in list_items() if i.d_name.decode('utf-8') == args.remote_path and not i.is_dir()]):
+ elif is_file_exists(args.remote_path):
+ copy_to_local(self, root_src_dir,
+ root_dst_dir + '/' + root_src_dir)
+ # any([i for i in list_items() if i.d_name.decode('utf-8') == and not i.is_dir()]):
+ elif '/'in root_src_dir and is_file_exists(root_src_dir.rsplit('/', 1)[1], root_src_dir.rsplit('/', 1)[0]):
copy_to_local(self, root_src_dir, root_dst_dir)
else:
files = list(reversed(sorted(dirwalk(root_src_dir))))
dst_dirpath, dst_file = file_.rsplit('/', 1)
if dst_dirpath in files:
files.remove(dst_dirpath)
- if not is_dir_exists(file_) and not os.path.exists(root_dst_dir + '/' +file_):
- copy_to_local(self, file_, re.sub('\/+', '/', root_dst_dir + '/'+ dst_dirpath + '/'+ dst_file))
- elif is_dir_exists(file_) and not os.path.exists(re.sub('\/+', '/', root_dst_dir + '/'+ dst_dirpath + '/' + dst_file)):
- os.makedirs(re.sub('\/+', '/', root_dst_dir + '/'+ dst_dirpath + '/' + dst_file))
+ if not is_dir_exists(file_) and not os.path.exists(root_dst_dir + '/' + file_):
+ copy_to_local(self, file_, re.sub(
+ '\/+', '/', root_dst_dir + '/' + dst_dirpath + '/' + dst_file))
+ elif is_dir_exists(file_) and not os.path.exists(re.sub('\/+', '/', root_dst_dir + '/' + dst_dirpath + '/' + dst_file)):
+ os.makedirs(re.sub('\/+', '/', root_dst_dir +
+ '/' + dst_dirpath + '/' + dst_file))
return 0
def complete_ls(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
-
- ls_parser = argparse.ArgumentParser(description = 'Copy a file from Ceph File System from Local Directory.')
- ls_parser.add_argument('-l', '--long', action = 'store_true', help = 'Detailed list of items in the directory.')
- ls_parser.add_argument('-r', '--reverse', action = 'store_true', help = 'Reverse order of listing items in the directory.')
- ls_parser.add_argument('-H', action = 'store_true', help = 'Human Readable')
- ls_parser.add_argument('-a','--all', action = 'store_true', help = 'Do not Ignore entries starting with .')
- ls_parser.add_argument('-S', action = 'store_true', help = 'Sort by file_size')
- ls_parser.add_argument('dir_names', help = 'Name of Directories', nargs = '*', default = [''])
-
+
+ ls_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System from Local Directory.')
+ ls_parser.add_argument('-l', '--long', action='store_true',
+ help='Detailed list of items in the directory.')
+ ls_parser.add_argument('-r', '--reverse', action='store_true',
+ help='Reverse order of listing items in the directory.')
+ ls_parser.add_argument('-H', action='store_true', help='Human Readable')
+ ls_parser.add_argument('-a', '--all', action='store_true',
+ help='Do not Ignore entries starting with .')
+ ls_parser.add_argument('-S', action='store_true', help='Sort by file_size')
+ ls_parser.add_argument(
+ 'dir_names', help='Name of Directories', nargs='*', default=[''])
+
@with_argparser(ls_parser)
def do_ls(self, args):
"""
all_items = get_all_possible_paths(dir_name)
if len(all_items) == 0:
continue
- dir_name = all_items[0].rsplit('/',1)[0]
+ dir_name = all_items[0].rsplit('/', 1)[0]
if dir_name == '':
dir_name = '/'
- items = [item for item in list_items(dir_name) for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and not item.is_dir()]
- dirs = [re.sub('\/+', '/', dir_name + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_name) for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
+ items = [item for item in list_items(dir_name) for i in all_items if i.rsplit(
+ '/', 1)[1] == item.d_name.decode('utf-8') and not item.is_dir()]
+ dirs = [re.sub('\/+', '/', dir_name + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_name)
+ for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
directories.extend(dirs)
if len(dirs) == 0:
self.poutput(dir_name, ':\n')
- items = sorted(items, key = lambda item: item.d_name)
+ items = sorted(items, key=lambda item: item.d_name)
else:
if dir_name != '' and dir_name != cephfs.getcwd().decode('utf-8') and len(directories) > 1:
self.poutput(dir_name, ':\n')
- items = sorted(list_items(dir_name), key = lambda item: item.d_name)
+ items = sorted(list_items(dir_name),
+ key=lambda item: item.d_name)
if not args.all and len(items) > 2:
- items = [i for i in items if not i.d_name.decode('utf-8').startswith('.')]
+ items = [i for i in items if not i.d_name.decode(
+ 'utf-8').startswith('.')]
flag = 0
if args.S:
- items = sorted(items, key = lambda item: cephfs.stat(to_bytes(dir_name + '/' + item.d_name.decode('utf-8'))).st_size)
+ items = sorted(items, key=lambda item: cephfs.stat(
+ to_bytes(dir_name + '/' + item.d_name.decode('utf-8'))).st_size)
if args.reverse:
items = reversed(items)
for item in items:
path = item
if not isinstance(item, str):
- path = item.d_name.decode('utf-8')
+ path = item.d_name.decode('utf-8')
if item.is_dir():
flag = 1
else:
flag = 0
if args.long and args.H:
- print_long(self, cephfs.getcwd().decode('utf-8') + dir_name + '/' + path, flag, True)
+ print_long(self, cephfs.getcwd().decode(
+ 'utf-8') + dir_name + '/' + path, flag, True)
elif args.long:
- print_long(self, cephfs.getcwd().decode('utf-8') + dir_name + '/' + path, flag, False)
+ print_long(self, cephfs.getcwd().decode(
+ 'utf-8') + dir_name + '/' + path, flag, False)
else:
- values.append(colorama.Fore.BLUE * flag + path + '/'* flag + colorama.Style.RESET_ALL * flag)
+ values.append(colorama.Fore.BLUE * flag + path +
+ '/' * flag + colorama.Style.RESET_ALL * flag)
if not args.long:
print_list(self, values, shutil.get_terminal_size().columns)
if dir_name != directories[-1]:
self.poutput('\n')
-
def complete_rmdir(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- rmdir_parser = argparse.ArgumentParser(description = 'Remove Directory.')
- rmdir_parser.add_argument('dir_paths', help = 'Directory Path.',nargs = '+')
- rmdir_parser.add_argument('-p', '--parent', action = 'store_true', help = 'Remove parent directories as necessary. When this option is specified, no error is reported if a directory has any sub-directories, files')
-
+ rmdir_parser = argparse.ArgumentParser(description='Remove Directory.')
+ rmdir_parser.add_argument('dir_paths', help='Directory Path.', nargs='+')
+ rmdir_parser.add_argument('-p', '--parent', action='store_true',
+ help='Remove parent directories as necessary. When this option is specified, no error is reported if a directory has any sub-directories, files')
+
@with_argparser(rmdir_parser)
def do_rmdir(self, args):
"""
is_pattern = True
all_items = get_all_possible_paths(dir_path)
if len(all_items) > 0:
- dir_path = all_items[0].rsplit('/',1)[0]
+ dir_path = all_items[0].rsplit('/', 1)[0]
if dir_path == '':
dir_path = '/'
- dirs = [re.sub('\/+', '/', dir_path + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_path) for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
+ dirs = [re.sub('\/+', '/', dir_path + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_path)
+ for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
directories.extend(dirs)
continue
else:
is_pattern = False
path = ''
if args.parent:
- files = reversed(sorted(set(dirwalk(cephfs.getcwd().decode('utf-8') + dir_path))))
+ files = reversed(
+ sorted(set(dirwalk(cephfs.getcwd().decode('utf-8') + dir_path))))
for i, path in enumerate(files):
path = re.sub('\/+', '/', path)
- if path[1:] != re.sub('\/+', '/', dir_path):
+ if path[1:] != re.sub('\/+', '/', dir_path):
try:
cephfs.rmdir(to_bytes(path))
except:
- cephfs.unlink(to_bytes(path))
- if not is_pattern and re.sub('\/+', '/', dir_path) != re.sub('\/+', '/', path):
+ cephfs.unlink(to_bytes(path))
+ if not is_pattern and re.sub('\/+', '/', dir_path) != re.sub('\/+', '/', path):
cephfs.rmdir(to_bytes(dir_path))
-
+
def complete_rm(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- rm_parser = argparse.ArgumentParser(description = 'Remove File.')
- rm_parser.add_argument('file_paths', help = 'File Path.', nargs = '+')
+ rm_parser = argparse.ArgumentParser(description='Remove File.')
+ rm_parser.add_argument('file_paths', help='File Path.', nargs='+')
@with_argparser(rm_parser)
def do_rm(self, args):
else:
cephfs.unlink(to_bytes(file_path))
-
def complete_mv(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- mv_parser = argparse.ArgumentParser(description = 'Move File.')
- mv_parser.add_argument('src_path', type = str, help = 'Source File Path.')
- mv_parser.add_argument('dest_path', type = str, help = 'Destination File Path.')
+ mv_parser = argparse.ArgumentParser(description='Move File.')
+ mv_parser.add_argument('src_path', type=str, help='Source File Path.')
+ mv_parser.add_argument('dest_path', type=str,
+ help='Destination File Path.')
@with_argparser(mv_parser)
def do_mv(self, args):
def complete_cd(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- cd_parser = argparse.ArgumentParser(description = 'Change working directory')
- cd_parser.add_argument('dir_name', type = str, help = 'Name of the directory.', default = '')
+ cd_parser = argparse.ArgumentParser(description='Change working directory')
+ cd_parser.add_argument('dir_name', type=str,
+ help='Name of the directory.', default='')
@with_argparser(cd_parser)
def do_cd(self, args):
cephfs.chdir(to_bytes(args.dir_name))
self.working_dir = cephfs.getcwd().decode('utf-8')
self.set_prompt()
-
-
+
def do_cwd(self, arglist):
"""
Get current working directory.
def complete_chmod(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- chmod_parser = argparse.ArgumentParser(description = 'Create Directory.')
- chmod_parser.add_argument('mode', type = int, help = 'Mode')
- chmod_parser.add_argument('file_name', type = str, help = 'Name of the file')
+ chmod_parser = argparse.ArgumentParser(description='Create Directory.')
+ chmod_parser.add_argument('mode', type=int, help='Mode')
+ chmod_parser.add_argument('file_name', type=str, help='Name of the file')
@with_argparser(chmod_parser)
def do_chmod(self, args):
"""
cephfs.chmod(args.file_name, args.mode)
-
def complete_cat(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- cat_parser = argparse.ArgumentParser(description = '')
- cat_parser.add_argument('file_names', help = 'Name of Files', nargs = '+')
-
+ cat_parser = argparse.ArgumentParser(description='')
+ cat_parser.add_argument('file_names', help='Name of Files', nargs='+')
+
@with_argparser(cat_parser)
def do_cat(self, args):
"""
self.poutput(file_name)
copy_to_local(self, file_name, '-')
-
- umask_parser = argparse.ArgumentParser(description = 'Set umask value.')
- umask_parser.add_argument('mode', help = 'Mode', action = 'store', nargs = '?', default = '')
+ umask_parser = argparse.ArgumentParser(description='Set umask value.')
+ umask_parser.add_argument(
+ 'mode', help='Mode', action='store', nargs='?', default='')
@with_argparser(umask_parser)
def do_umask(self, args):
Set Umask value.
"""
if args.mode == '':
- self.poutput(self.umask.zfill(4))
- else:
+ self.poutput(self.umask.zfill(4))
+ else:
mode = int(args.mode, 8)
self.umask = str(oct(cephfs.umask(mode))[2:])
-
def complete_write(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- write_parser = argparse.ArgumentParser(description = '')
- write_parser.add_argument('file_name', type = str, help = 'Name of File')
+ write_parser = argparse.ArgumentParser(description='')
+ write_parser.add_argument('file_name', type=str, help='Name of File')
@with_argparser(write_parser)
def do_write(self, args):
copy_from_local(self, '-', args.file_name)
-
def complete_lcd(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
index_dict = {1: self.path_complete}
return self.index_based_complete(text, line, begidx, endidx, index_dict)
- lcd_parser = argparse.ArgumentParser(description = '')
- lcd_parser.add_argument('path', type = str, help = 'Path')
+ lcd_parser = argparse.ArgumentParser(description='')
+ lcd_parser.add_argument('path', type=str, help='Path')
@with_argparser(lcd_parser)
def do_lcd(self, args):
os.chdir(path)
# self.poutput(get_all_possible_paths(args.path))
-
def complete_lls(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
index_dict = {1: self.path_complete}
return self.index_based_complete(text, line, begidx, endidx, index_dict)
- lls_parser = argparse.ArgumentParser(description = 'List files in local system.')
- lls_parser.add_argument('paths', help = 'Paths', nargs = '*', default = [''])
+ lls_parser = argparse.ArgumentParser(
+ description='List files in local system.')
+ lls_parser.add_argument('paths', help='Paths', nargs='*', default=[''])
@with_argparser(lls_parser)
def do_lls(self, args):
args.paths.append(os.getcwd())
for path in args.paths:
if os.path.isabs(path):
- path = os.path.relpath(os.getcwd(), '/'+ path)
+ path = os.path.relpath(os.getcwd(), '/' + path)
items = os.listdir(path)
- print_list(self, items)
+ print_list(self, items)
-
def do_lpwd(self, arglist):
"""
Prints the absolute path of the current local directory
"""
for index, i in enumerate(list_items(cephfs.getcwd())[2:]):
if index == 0:
- self.poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format("1K-blocks", "Used", "Available", "Use%", "Stored on"))
+ self.poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format(
+ "1K-blocks", "Used", "Available", "Use%", "Stored on"))
if not is_dir_exists(i.d_name):
statfs = cephfs.statfs(i.d_name)
stat = cephfs.stat(i.d_name)
use = 0
if block_size > 0:
use = (stat.st_size*100 // block_size)
- self.poutput('{:25d}\t{:5d}\t{:10d}\t{:5s} {}'.format(statfs['f_fsid'], stat.st_size, available, str(int(use)) + '%', i.d_name.decode('utf-8')))
-
+ self.poutput('{:25d}\t{:5d}\t{:10d}\t{:5s} {}'.format(
+ statfs['f_fsid'], stat.st_size, available, str(int(use)) + '%', i.d_name.decode('utf-8')))
- locate_parser = argparse.ArgumentParser(description = 'Find file within file system')
- locate_parser.add_argument('name', help = 'name', type = str)
- locate_parser.add_argument('-c', '--count', action = 'store_true', help = 'Count list of items located.')
- locate_parser.add_argument('-i', '--ignorecase', action = 'store_true', help = 'Ignore case')
+ locate_parser = argparse.ArgumentParser(
+ description='Find file within file system')
+ locate_parser.add_argument('name', help='name', type=str)
+ locate_parser.add_argument(
+ '-c', '--count', action='store_true', help='Count list of items located.')
+ locate_parser.add_argument(
+ '-i', '--ignorecase', action='store_true', help='Ignore case')
@with_argparser(locate_parser)
def do_locate(self, args):
if args.name[0] == '*':
args.name += '/'
elif args.name[-1] == '*':
- args.name = '/'+ args.name
+ args.name = '/' + args.name
args.name = args.name.replace('*', '')
if args.ignorecase:
- locations = [i for i in sorted(set(dirwalk(cephfs.getcwd().decode('utf-8')))) if args.name.lower() in i.lower()]
+ locations = [i for i in sorted(
+ set(dirwalk(cephfs.getcwd().decode('utf-8')))) if args.name.lower() in i.lower()]
else:
- locations = [i for i in sorted(set(dirwalk(cephfs.getcwd().decode('utf-8')))) if args.name in i]
+ locations = [i for i in sorted(
+ set(dirwalk(cephfs.getcwd().decode('utf-8')))) if args.name in i]
if args.count:
self.poutput(len(locations))
else:
def complete_du(self, text, line, begidx, endidx):
"""
auto complete of file name.
- """
+ """
return self.complete_filenames(text, line, begidx, endidx)
- du_parser = argparse.ArgumentParser(description = 'Disk Usage of a Directory')
- du_parser.add_argument('dirs', type = str, help = 'Name of the directory.', nargs = '?', default = '')
- du_parser.add_argument('-r', action = 'store_true', help = 'Recursive Disk usage of all directories.')
-
+ du_parser = argparse.ArgumentParser(
+ description='Disk Usage of a Directory')
+ du_parser.add_argument(
+ 'dirs', type=str, help='Name of the directory.', nargs='?', default='')
+ du_parser.add_argument('-r', action='store_true',
+ help='Recursive Disk usage of all directories.')
+
@with_argparser(du_parser)
def do_du(self, args):
"""
Disk Usage of a Directory
"""
if args.dirs == '':
- args.dirs = cephfs.getcwd().decode('utf-8')
+ args.dirs = cephfs.getcwd().decode('utf-8')
for dir_ in args.dirs:
- if args.r:
+ if args.r:
for i in reversed(sorted(set(dirwalk(dir_)))):
try:
- self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(i), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', i)))
+ self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(
+ i), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', i)))
except:
continue
else:
- self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(dir_), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', dir_)))
-
+ self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(
+ dir_), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', dir_)))
+
def do_help(self, line):
"""
Get details about a command.
parser.print_help()
else:
super().do_help(line)
-
+
+
if __name__ == '__main__':
config_file = ''
if sys.argv[1] == '-c':
setup_cephfs(config_file)
sys.argv = [sys.argv[0]]
c = CephFSShell()
- c.cmdloop()
\ No newline at end of file
+ c.cmdloop()