import argparse
import os
+import os.path
import sys
from cmd2 import Cmd, with_argparser
import cephfs as libcephfs
def setup_cephfs(config_file):
- """
- Mouting a cephfs
+ """
+ Mouting a cephfs
"""
global cephfs
cephfs = libcephfs.LibCephFS(conffile=config_file)
def mode_notation(mode):
"""
"""
- permission_bits = {'0': '---', '1': '--x', '2': '-w-',
- '3': '-wx', '4': 'r--', '5': 'r-x', '6': 'rw-', '7': 'rwx'}
+ permission_bits = {'0': '---',
+ '1': '--x',
+ '2': '-w-',
+ '3': '-wx',
+ '4': 'r--',
+ '5': 'r-x',
+ '6': 'rw-',
+ '7': 'rwx'}
mode = str(oct(mode))
notation = '-'
if mode[2] == '4':
if isinstance(dir_name, bytes):
dir_name = dir_name.decode('utf-8')
paths = []
- parent_dir = dir_name.rsplit('/', 1)[0]
+ parent_dir = os.path.dirname(dir_name)
if parent_dir == '':
parent_dir = '/'
- if dir_name == '/' or is_dir_exists(dir_name.rsplit('/', 1)[1], parent_dir):
+ if dir_name == '/' or is_dir_exists(os.path.basename(dir_name), parent_dir):
for i in list_items(dir_name)[2:]:
if fnmatch.fnmatch(i.d_name.decode('utf-8'), pattern):
- paths.append(re.sub('\/+', '/', dir_name +
- '/' + i.d_name.decode('utf-8')))
+ paths.append(os.path.join(dir_name, i.d_name.decode('utf-8')))
return paths
def get_all_possible_paths(pattern):
paths = []
- dir_ = cephfs.getcwd()
- is_rel_path = True
- if pattern[0] == '/':
+ is_rel_path = not os.path.isabs(pattern)
+ if is_rel_path:
+ dir_ = cephfs.getcwd()
+ else:
dir_ = '/'
pattern = pattern[1:]
- is_rel_path = False
patterns = pattern.split('/')
paths.extend(glob(dir_, patterns[0]))
patterns.pop(0)
if not isinstance(file_name, bytes):
file_name = to_bytes(file_name)
info = cephfs.stat(file_name)
+ file_name = os.path.basename(file_name.decode('utf-8'))
if flag:
- file_name = colorama.Fore.BLUE + \
- file_name.decode('utf-8').rsplit('/',
- 1)[1] + '/' + colorama.Style.RESET_ALL
- else:
- file_name = file_name.decode('utf-8').rsplit('/', 1)[1]
+ file_name = colorama.Fore.BLUE + file_name + '/' + colorama.Style.RESET_ALL
if human_readable:
- shell.poutput('{}\t{:10s} {} {} {} {}'.format(mode_notation(info.st_mode), humansize(
- info.st_size), info.st_uid, info.st_gid, info.st_mtime, file_name, sep='\t'))
+ shell.poutput('{}\t{:10s} {} {} {} {}'.format(
+ mode_notation(info.st_mode),
+ humansize(info.st_size), info.st_uid,
+ info.st_gid, info.st_mtime, file_name, sep='\t'))
else:
- shell.poutput('{} {:12d} {} {} {} {}'.format(mode_notation(
- info.st_mode), info.st_size, info.st_uid, info.st_gid, info.st_mtime, file_name, sep='\t'))
+ shell.poutput('{} {:12d} {} {} {} {}'.format(
+ mode_notation(info.st_mode), info.st_size, info.st_uid,
+ info.st_gid, info.st_mtime, file_name, sep='\t'))
def word_len(word):
dir_ = to_bytes(dir_)
if not isinstance(file_name, bytes):
if file_name.count('/') > 0:
- file_name = to_bytes(file_name.rsplit('/', 1)[1])
+ file_name = to_bytes(os.path.basename(file_name))
else:
file_name = to_bytes(file_name)
return len([i for i in set(list_items(dir_)) if i.d_name == file_name and not i.is_dir()]) > 0
word = words[i]
if word[0] == '\x1b':
shell.poutput(
- '%-*s' % (width + 9, words[i]), end='\n'if i + nrows >= nwords else '')
+ '%-*s' % (width + 9, words[i]), end='\n' if i + nrows >= nwords else '')
else:
shell.poutput(
- '%-*s' % (width, words[i]), end='\n'if i + nrows >= nwords else '')
+ '%-*s' % (width, words[i]), end='\n' if i + nrows >= nwords else '')
def copy_from_local(shell, local_path, remote_path):
def copy_to_local(shell, remote_path, local_path):
- if not os.path.exists(local_path.rsplit('/', 1)[0]):
- os.makedirs(local_path.rsplit('/', 1)[0], exist_ok=True)
+ local_dir = os.path.dirname(local_path)
+ if not os.path.exists(local_dir):
+ os.makedirs(local_dir, exist_ok=True)
+
fd = None
if len(remote_path.rsplit('/', 1)) > 2 and remote_path.rsplit('/', 1)[1] == '':
return
fd.close()
-def dirwalk(dir_name, giveDirs=0):
+def dirwalk(dir_name):
"""
walk a directory tree, using a generator
"""
- dir_name = re.sub('\/+', '/', dir_name)
+ dir_name = os.path.normpath(dir_name)
for item in list_items(dir_name)[2:]:
- fullpath = dir_name + '/' + item.d_name.decode('utf-8')
+ fullpath = os.path.join(dir_name, item.d_name.decode('utf-8'))
yield fullpath.rsplit('/', 1)[0] + '/'
if is_dir_exists(item.d_name, fullpath.rsplit('/', 1)[0]):
if not len(list_items(fullpath)[2:]):
- yield re.sub('\/+', '/', fullpath)
+ yield fullpath
else:
- for x in dirwalk(fullpath):
- if giveDirs:
- yield x
- else:
- yield x
+ return dirwalk(fullpath)
else:
- yield re.sub('\/+', '/', fullpath)
+ yield fullpath
class CephFSShell(Cmd):
self.poutput('Unrecognized command:', line)
def set_prompt(self):
- self.prompt = '\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX + \
- self.working_dir + colorama.Style.RESET_ALL + \
- '\033[01;33m>>>\033[00m '
+ self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX +
+ self.working_dir + colorama.Style.RESET_ALL + '\033[01;33m>>>\033[00m ')
def create_argparser(self, command):
try:
traceback.print_exc(file=sys.stdout)
def complete_mkdir(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
mkdir_parser = argparse.ArgumentParser(
description='Create the directory(ies), if they do not already exist.')
- mkdir_parser.add_argument(
- 'dirs', type=str, metavar='DIR_NAME', help='Name of new_directory.', nargs='+')
- mkdir_parser.add_argument('-m', '--mode', action='store',
- help='Sets the access mode for the new directory.', type=str)
+ mkdir_parser.add_argument('dirs', type=str,
+ metavar='DIR_NAME',
+ help='Name of new_directory.',
+ nargs='+')
+ mkdir_parser.add_argument('-m', '--mode', type=str,
+ action='store',
+ help='Sets the access mode for the new directory.')
mkdir_parser.add_argument('-p', '--parent', action='store_true',
- help='Create parent directories as necessary. When this option is specified, no error is reported if a directory already exists.')
+ help='Create parent directories as necessary. \
+When this option is specified, no error is reported if a directory already \
+exists.')
@with_argparser(mkdir_parser)
def do_mkdir(self, args):
cephfs.mkdir(path, permission)
def complete_put(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
index_dict = {1: self.path_complete}
@with_argparser(put_parser)
def do_put(self, args):
- """
+ """
Copy a file to Ceph File System from Local Directory.
"""
root_src_dir = args.local_path
cephfs.mkdirs(to_bytes(dst_dir), 0o777)
for file_ in files:
src_file = os.path.join(src_dir, file_)
- dst_file = re.sub('\/+', '/', '/' + dst_dir + '/' + file_)
- if (not args.force) and is_file_exists(re.sub('\/+', '/', dst_file)):
+ dst_file = os.path.join(dst_dir, file_)
+ if (not args.force) and is_file_exists(dst_file):
return
- copy_from_local(
- self, src_file, re.sub('\/+', '/', dst_file))
+ copy_from_local(self, src_file, dst_file)
def complete_get(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
@with_argparser(get_parser)
def do_get(self, args):
- """
+ """
Copy a file/directory from Ceph File System to Local Directory.
"""
root_src_dir = args.remote_path
dst_dirpath, dst_file = file_.rsplit('/', 1)
if dst_dirpath in files:
files.remove(dst_dirpath)
- if not is_dir_exists(file_) and not os.path.exists(root_dst_dir + '/' + file_):
- copy_to_local(self, file_, re.sub(
- '\/+', '/', root_dst_dir + '/' + dst_dirpath + '/' + dst_file))
- elif is_dir_exists(file_) and not os.path.exists(re.sub('\/+', '/', root_dst_dir + '/' + dst_dirpath + '/' + dst_file)):
- os.makedirs(re.sub('\/+', '/', root_dst_dir +
- '/' + dst_dirpath + '/' + dst_file))
+ dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file)
+ dst_path = os.path.normpath(dst_path)
+ if os.path.exists(dst_path):
+ continue
+ if is_dir_exists(file_):
+ os.makedirs(dst_path)
+ else:
+ copy_to_local(self, file_, dst_path)
return 0
def complete_ls(self, text, line, begidx, endidx):
@with_argparser(ls_parser)
def do_ls(self, args):
- """
+ """
List all the files and directories in the current working directory
"""
directories = args.dir_names
dir_name = all_items[0].rsplit('/', 1)[0]
if dir_name == '':
dir_name = '/'
- items = [item for item in list_items(dir_name) for i in all_items if i.rsplit(
- '/', 1)[1] == item.d_name.decode('utf-8') and not item.is_dir()]
- dirs = [re.sub('\/+', '/', dir_name + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_name)
- for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
- directories.extend(dirs)
- if len(dirs) == 0:
+ dirs = []
+ for i in all_items:
+ for item in list_items(dir_name):
+ d_name = item.d_name.decode('utf-8')
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(dir_name, d_name))
+ else:
+ items.append(item)
+ if dirs:
+ directories.extend(dirs)
+ else:
self.poutput(dir_name, ':\n')
items = sorted(items, key=lambda item: item.d_name)
else:
self.poutput('\n')
def complete_rmdir(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
rmdir_parser = argparse.ArgumentParser(description='Remove Directory.')
rmdir_parser.add_argument('dir_paths', help='Directory Path.', nargs='+')
rmdir_parser.add_argument('-p', '--parent', action='store_true',
- help='Remove parent directories as necessary. When this option is specified, no error is reported if a directory has any sub-directories, files')
+ help='Remove parent directories as necessary. \
+When this option is specified, no error is reported if a directory has any \
+sub-directories, files')
@with_argparser(rmdir_parser)
def do_rmdir(self, args):
- """
+ """
Remove a specific Directory
"""
is_pattern = False
dir_path = all_items[0].rsplit('/', 1)[0]
if dir_path == '':
dir_path = '/'
- dirs = [re.sub('\/+', '/', dir_path + '/' + item.d_name.decode('utf-8')) for item in list_items(dir_path)
- for i in all_items if i.rsplit('/', 1)[1] == item.d_name.decode('utf-8') and item.is_dir()]
+ dirs = []
+ for i in all_items:
+ for item in list_items(dir_name):
+ d_name = item.d_name.decode('utf-8')
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(dir_name, d_name))
directories.extend(dirs)
continue
else:
is_pattern = False
path = ''
+ dir_path = os.path.normpath(dir_path)
if args.parent:
files = reversed(
sorted(set(dirwalk(cephfs.getcwd().decode('utf-8') + dir_path))))
- for i, path in enumerate(files):
- path = re.sub('\/+', '/', path)
- if path[1:] != re.sub('\/+', '/', dir_path):
+ for path in files:
+ path = os.path.normpath(path)
+ if path[1:] != dir_path:
try:
cephfs.rmdir(to_bytes(path))
- except:
+ except libcephfs.Error:
cephfs.unlink(to_bytes(path))
- if not is_pattern and re.sub('\/+', '/', dir_path) != re.sub('\/+', '/', path):
+ if not is_pattern and dir_path != os.path.normpath(path):
cephfs.rmdir(to_bytes(dir_path))
def complete_rm(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
cephfs.rename(to_bytes(args.src_path), to_bytes(args.dest_path))
def complete_cd(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
@with_argparser(cd_parser)
def do_cd(self, args):
- """
+ """
Change working directory
"""
if args.dir_name == '':
cephfs.chmod(args.file_name, args.mode)
def complete_cat(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
self.umask = str(oct(cephfs.umask(mode))[2:])
def complete_write(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
@with_argparser(write_parser)
def do_write(self, args):
- """
+ """
Write data into a file.
"""
copy_from_local(self, '-', args.file_name)
def complete_lcd(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
index_dict = {1: self.path_complete}
# self.poutput(get_all_possible_paths(args.path))
def complete_lls(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
index_dict = {1: self.path_complete}
def do_df(self, arglist):
"""
- Display the amount of available disk space for file systems
+ Display the amount of available disk space for file systems
"""
for index, i in enumerate(list_items(cephfs.getcwd())[2:]):
if index == 0:
if block_size > 0:
use = (stat.st_size*100 // block_size)
self.poutput('{:25d}\t{:5d}\t{:10d}\t{:5s} {}'.format(
- statfs['f_fsid'], stat.st_size, available, str(int(use)) + '%', i.d_name.decode('utf-8')))
+ statfs['f_fsid'], stat.st_size, available,
+ str(int(use)) + '%', i.d_name.decode('utf-8')))
locate_parser = argparse.ArgumentParser(
description='Find file within file system')
self.poutput('\n'.join(locations))
def complete_du(self, text, line, begidx, endidx):
- """
+ """
auto complete of file name.
"""
return self.complete_filenames(text, line, begidx, endidx)
for dir_ in args.dirs:
if args.r:
for i in reversed(sorted(set(dirwalk(dir_)))):
+ i = os.path.normpath(i)
try:
- self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(
- i), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', i)))
- except:
+ xattr = cephfs.getxattr(to_bytes(i), 'ceph.dir.rbytes')
+ self.poutput('{:10s} {}'.format(
+ humansize(int(xattr.decode('utf-8'))), '.' + i))
+ except libcephfs.Error:
continue
else:
+ dir_ = os.path.normpath(dir_)
self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr(to_bytes(
- dir_), 'ceph.dir.rbytes').decode('utf-8'))), '.' + re.sub('\/+', '/', dir_)))
+ dir_), 'ceph.dir.rbytes').decode('utf-8'))), '.' + dir_))
def do_help(self, line):
"""