except cephfs.ObjectNotFound:
return []
-def glob(dir_name, pattern):
- if isinstance(dir_name, bytes):
- dir_name = dir_name.decode('utf-8')
+def glob(path, pattern):
+ if isinstance(path, bytes):
+ path = path.decode('utf-8')
paths = []
- parent_dir = os.path.dirname(dir_name)
+ parent_dir = os.path.dirname(path)
if parent_dir == '':
parent_dir = '/'
- if dir_name == '/' or is_dir_exists(os.path.basename(dir_name),
+ if path == '/' or is_dir_exists(os.path.basename(path),
parent_dir):
- for i in ls(dir_name, opts='A'):
+ for i in ls(path, opts='A'):
if fnmatch.fnmatch(i.d_name, pattern):
- paths.append(os.path.join(dir_name, i.d_name))
+ paths.append(os.path.join(path, i.d_name))
return paths
return len(word)
-def is_dir_exists(dir_name, dir_=''):
- if isinstance(dir_name, bytes):
- dir_name = dir_name.decode('utf-8')
+def is_dir_exists(path, dir_=''):
+ if isinstance(path, bytes):
+ path = path.decode('utf-8')
if isinstance(dir_, bytes):
dir_ = dir_.decode('utf-8')
- path_to_stat = os.path.join(dir_, dir_name)
+ path_to_stat = os.path.join(dir_, path)
try:
return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0)
except libcephfs.Error:
return False
-def is_file_exists(file_name, dir_=''):
- if isinstance(file_name, bytes):
- file_name = file_name.decode('utf-8')
+def is_file_exists(path, dir_=''):
+ if isinstance(path, bytes):
+ path = path.decode('utf-8')
if isinstance(dir_, bytes):
dir_ = dir_.decode('utf-8')
try:
# if its not a directory, then its a file
- return ((cephfs.stat(os.path.join(dir_, file_name)).st_mode & 0o0040000) == 0)
+ return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0)
except libcephfs.Error:
return False
fd.close()
-def dirwalk(dir_name):
+def dirwalk(path):
"""
walk a directory tree, using a generator
"""
- dir_name = os.path.normpath(dir_name)
- for item in ls(dir_name, opts='A'):
- fullpath = os.path.join(dir_name, item.d_name.decode('utf-8'))
+ path = os.path.normpath(path)
+ for item in ls(path, opts='A'):
+ fullpath = os.path.join(path, item.d_name.decode('utf-8'))
src_path = fullpath.rsplit('/', 1)[0]
yield os.path.normpath(fullpath)
ls_parser.add_argument('-a', '--all', action='store_true',
help='Do not Ignore entries starting with .')
ls_parser.add_argument('-S', action='store_true', help='Sort by file_size')
- ls_parser.add_argument(
- 'dir_names', help='Name of Directories', nargs='*', default=[''])
+ ls_parser.add_argument('paths', help='Name of Directories', nargs='*', default=['.'])
@with_argparser(ls_parser)
def do_ls(self, args):
"""
List all the files and directories in the current working directory
"""
- directories = args.dir_names
- for dir_name in directories:
+ paths = args.paths
+ for path in paths:
values = []
items = []
- if dir_name.count('*') > 0:
- all_items = get_all_possible_paths(dir_name)
+ if path.count('*') > 0:
+ all_items = get_all_possible_paths(path)
if len(all_items) == 0:
continue
- dir_name = all_items[0].rsplit('/', 1)[0]
- if dir_name == '':
- dir_name = '/'
+ path = all_items[0].rsplit('/', 1)[0]
+ if path == '':
+ path = '/'
dirs = []
for i in all_items:
- for item in ls(dir_name):
+ for item in ls(path):
d_name = item.d_name.decode('utf-8')
if os.path.basename(i) == d_name:
if item.is_dir():
- dirs.append(os.path.join(dir_name, d_name))
+ dirs.append(os.path.join(path, d_name))
else:
items.append(item)
if dirs:
- directories.extend(dirs)
+ paths.extend(dirs)
else:
- self.poutput(dir_name, ':\n')
+ self.poutput(path, ':\n')
items = sorted(items, key=lambda item: item.d_name)
else:
- if dir_name != '' and dir_name != cephfs.getcwd().decode(
- 'utf-8') and len(directories) > 1:
- self.poutput(dir_name, ':\n')
- items = sorted(ls(dir_name),
+ if path != '' and path != cephfs.getcwd().decode(
+ 'utf-8') and len(paths) > 1:
+ self.poutput(path, ':\n')
+ items = sorted(ls(path),
key=lambda item: item.d_name)
if not args.all:
items = [i for i in items if not i.d_name.startswith(b'.')]
if args.S:
items = sorted(items, key=lambda item: cephfs.stat(
- dir_name + '/' + item.d_name.decode('utf-8')).st_size)
+ path + '/' + item.d_name.decode('utf-8')).st_size)
if args.reverse:
items = reversed(items)
if args.long and args.H:
print_long(cephfs.getcwd().decode(
- 'utf-8') + dir_name + '/' + path, is_dir, True)
+ 'utf-8') + path + '/' + path, is_dir, True)
elif args.long:
print_long(cephfs.getcwd().decode('utf-8') +
- dir_name +
+ path +
'/' +
path,
is_dir, False)
values.append(path)
if not args.long:
print_list(values, shutil.get_terminal_size().columns)
- if dir_name != directories[-1]:
+ if path != paths[-1]:
self.poutput('\n')
def complete_rmdir(self, text, line, begidx, endidx):
return self.complete_filenames(text, line, begidx, endidx)
rmdir_parser = argparse.ArgumentParser(description='Remove Directory.')
- rmdir_parser.add_argument('dir_paths', help='Directory Path.', nargs='+')
+ rmdir_parser.add_argument('paths', help='Directory Path.', nargs='+')
rmdir_parser.add_argument('-p', '--parent', action='store_true',
help='Remove parent directories as necessary. \
When this option is specified, no error is reported if a directory has any \
Remove a specific Directory
"""
is_pattern = False
- directories = args.dir_paths
- for dir_path in directories:
- if dir_path.count('*') > 0:
+ paths = args.paths
+ for path in paths:
+ if path.count('*') > 0:
is_pattern = True
- all_items = get_all_possible_paths(dir_path)
+ all_items = get_all_possible_paths(path)
if len(all_items) > 0:
- dir_path = all_items[0].rsplit('/', 1)[0]
- if dir_path == '':
- dir_path = '/'
+ path = all_items[0].rsplit('/', 1)[0]
+ if path == '':
+ path = '/'
dirs = []
for i in all_items:
- for item in ls(dir_path):
+ for item in ls(path):
d_name = item.d_name
if os.path.basename(i) == d_name:
if item.is_dir():
- dirs.append(os.path.join(dir_path, d_name))
- directories.extend(dirs)
+ dirs.append(os.path.join(path, d_name))
+ paths.extend(dirs)
continue
else:
is_pattern = False
path = ''
- dir_path = os.path.normpath(os.path.join(
- cephfs.getcwd().decode('utf-8'), dir_path))
+ path = os.path.normpath(os.path.join(
+ cephfs.getcwd().decode('utf-8'), path))
if args.parent:
files = reversed(
- sorted(set(dirwalk(dir_path))))
+ sorted(set(dirwalk(path))))
for path in files:
path = os.path.normpath(path)
- if path[1:] != dir_path:
+ if path[1:] != path:
try:
cephfs.rmdir(path)
except libcephfs.Error:
cephfs.unlink(path)
- if not is_pattern and dir_path != os.path.normpath(path):
+ if not is_pattern and path != os.path.normpath(path):
try:
- cephfs.rmdir(dir_path)
+ cephfs.rmdir(path)
except libcephfs.Error:
- self.poutput('error: no such directory "%s"' % dir_path)
+ self.poutput('error: no such directory "%s"' % path)
def complete_rm(self, text, line, begidx, endidx):
"""
return self.complete_filenames(text, line, begidx, endidx)
rm_parser = argparse.ArgumentParser(description='Remove File.')
- rm_parser.add_argument('file_paths', help='File Path.', nargs='+')
+ rm_parser.add_argument('paths', help='File Path.', nargs='+')
@with_argparser(rm_parser)
def do_rm(self, args):
"""
Remove a specific file
"""
- files = args.file_paths
- for file_path in files:
- if file_path.count('*') > 0:
+ for path in args.paths:
+ if path.count('*') > 0:
files.extend([i for i in get_all_possible_paths(
- file_path) if is_file_exists(i)])
+ path) if is_file_exists(i)])
else:
try:
- cephfs.unlink(file_path)
+ cephfs.unlink(path)
except libcephfs.Error:
- self.poutput('%s: no such file' % file_path)
+ self.poutput('%s: no such file' % path)
def complete_mv(self, text, line, begidx, endidx):
"""
return self.complete_filenames(text, line, begidx, endidx)
cd_parser = argparse.ArgumentParser(description='Change working directory')
- cd_parser.add_argument('dir_name', type=str,
- help='Name of the directory.', default='')
+ cd_parser.add_argument('path', type=str, help='Name of the directory.', default='/')
@with_argparser(cd_parser)
def do_cd(self, args):
"""
Change working directory
"""
- if args.dir_name == '':
- cephfs.chdir(b'/')
- if args.dir_name == '..':
- dir_name = cephfs.getcwd().decode('utf-8').rsplit('/', 1)[0]
- if dir_name != '':
- cephfs.chdir(dir_name)
- else:
- cephfs.chdir(b'/')
- else:
- try:
- cephfs.chdir(args.dir_name)
- except libcephfs.Error:
- self.poutput("%s: no such directory" % args.dir_name)
- self.working_dir = cephfs.getcwd().decode('utf-8')
- self.set_prompt()
+ try:
+ cephfs.chdir(args.path)
+ self.working_dir = cephfs.getcwd().decode('utf-8')
+ self.set_prompt()
+ except libcephfs.Error:
+ self.poutput("%s: no such directory" % args.path)
def do_cwd(self, arglist):
"""
chmod_parser = argparse.ArgumentParser(description='Create Directory.')
chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode')
- chmod_parser.add_argument('file_name', type=str, help='Name of the file')
+ chmod_parser.add_argument('paths', type=str, help='Name of the file', nargs="+")
@with_argparser(chmod_parser)
def do_chmod(self, args):
"""
Change permission of a file
"""
- mode = int(args.mode, base=8)
- try:
- cephfs.chmod(args.file_name, mode)
- except libcephfs.Error:
- self.poutput('%s: no such file or directory' % args.file_name)
+ for path in args.paths:
+ mode = int(args.mode, base=8)
+ try:
+ cephfs.chmod(path, mode)
+ except libcephfs.Error:
+ self.poutput('%s: no such file or directory' % path)
def complete_cat(self, text, line, begidx, endidx):
"""
return self.complete_filenames(text, line, begidx, endidx)
cat_parser = argparse.ArgumentParser(description='')
- cat_parser.add_argument('file_names', help='Name of Files', nargs='+')
+ cat_parser.add_argument('paths', help='Name of Files', nargs='+')
@with_argparser(cat_parser)
def do_cat(self, args):
"""
Print contents of a file
"""
- for file_name in args.file_names:
- if is_file_exists(file_name):
- copy_to_local(file_name, '-')
+ for path in args.paths:
+ if is_file_exists(path):
+ copy_to_local(path, '-')
else:
- self.poutput('%s: no such file' % file_name)
+ self.poutput('%s: no such file' % path)
umask_parser = argparse.ArgumentParser(description='Set umask value.')
umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction,
return self.complete_filenames(text, line, begidx, endidx)
write_parser = argparse.ArgumentParser(description='')
- write_parser.add_argument('file_name', type=str, help='Name of File')
+ write_parser.add_argument('path', type=str, help='Name of File')
@with_argparser(write_parser)
def do_write(self, args):
Write data into a file.
"""
- copy_from_local('-', args.file_name)
+ copy_from_local('-', args.path)
def complete_lcd(self, text, line, begidx, endidx):
"""
du_parser = argparse.ArgumentParser(
description='Disk Usage of a Directory')
du_parser.add_argument(
- 'dirs', type=str, help='Name of the directory.', nargs='?', default='')
+ 'dirs', type=str, help='Name of the directory.', nargs='?', default='.')
du_parser.add_argument('-r', action='store_true',
help='Recursive Disk usage of all directories.')
description='Quota management for a Directory')
quota_parser.add_argument('op', choices=['get', 'set'],
help='Quota operation type.')
- quota_parser.add_argument('dir', type=str, help='Name of the directory.')
+ quota_parser.add_argument('path', type=str, help='Name of the directory.')
quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?',
help='Max cumulative size of the data under '
'this directory.')
"""
Quota management.
"""
- if not is_dir_exists(args.dir):
- self.poutput("error: no such directory '%s'" % args.dir)
+ if not is_dir_exists(args.path):
+ self.poutput("error: no such directory '%s'" % str(args.path))
return
if args.op == 'set':
if args.max_bytes >= 0:
max_bytes = to_bytes(str(args.max_bytes))
try:
- cephfs.setxattr(args.dir, 'ceph.quota.max_bytes',
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
max_bytes, len(max_bytes),
os.XATTR_CREATE)
self.poutput('max_bytes set to %d' % args.max_bytes)
except libcephfs.Error:
- cephfs.setxattr(args.dir, 'ceph.quota.max_bytes',
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
max_bytes, len(max_bytes),
os.XATTR_REPLACE)
self.poutput('max_bytes reset to %d' % args.max_bytes)
if args.max_files >= 0:
max_files = to_bytes(str(args.max_files))
try:
- cephfs.setxattr(args.dir, 'ceph.quota.max_files',
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
max_files, len(max_files),
os.XATTR_CREATE)
self.poutput('max_files set to %d' % args.max_files)
except libcephfs.Error:
- cephfs.setxattr(args.dir, 'ceph.quota.max_files',
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
max_files, len(max_files),
os.XATTR_REPLACE)
self.poutput('max_files reset to %d' % args.max_files)
max_bytes = '0'
max_files = '0'
try:
- max_bytes = cephfs.getxattr(args.dir,
+ max_bytes = cephfs.getxattr(args.path,
'ceph.quota.max_bytes')
self.poutput('max_bytes: %s' % max_bytes)
except libcephfs.Error:
pass
try:
- max_files = cephfs.getxattr(args.dir,
+ max_files = cephfs.getxattr(args.path,
'ceph.quota.max_files')
self.poutput('max_files: %s' % max_files)
except libcephfs.Error:
stat_parser = argparse.ArgumentParser(
description='Display file or file system status')
- stat_parser.add_argument('name', type=str, help='Name of the file',
+ stat_parser.add_argument('paths', type=str, help='file paths',
nargs='+')
@with_argparser(stat_parser)
"""
Display file or file system status
"""
- for files in args.name:
+ for path in args.paths:
try:
- stat = cephfs.stat(files)
+ stat = cephfs.stat(path)
atime = stat.st_atime.isoformat(' ')
mtime = stat.st_mtime.isoformat(' ')
ctime = stat.st_mtime.isoformat(' ')
self.poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n\
Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: {:o}/{}\tUid: {:d}\tGid: {:d}\n\
-Access: {}\nModify: {}\nChange: {}".format(files, stat.st_size, stat.st_blocks,
+Access: {}\nModify: {}\nChange: {}".format(path, stat.st_size, stat.st_blocks,
stat.st_blksize, stat.st_dev, stat.st_ino,
stat.st_nlink, stat.st_mode,
mode_notation(stat.st_mode), stat.st_uid,
stat.st_gid, atime, mtime, ctime))
except libcephfs.Error:
- self.poutput("{}: no such file or directory".format(files))
+ self.poutput("{}: no such file or directory".format(path))
if __name__ == '__main__':