From 54163923db116c2a2e2a632117e9578dbcc9b9fb Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 9 Nov 2015 13:09:27 +0000 Subject: [PATCH] tasks/ceph: fix up whitespace ...because otherwise it lights up like a christmas tree in pycharm. Signed-off-by: John Spray (cherry picked from commit f05d9776281d27df03e74e10085e691669532e95) --- tasks/ceph.py | 278 ++++++++++++++++++++++++++------------------------ 1 file changed, 142 insertions(+), 136 deletions(-) diff --git a/tasks/ceph.py b/tasks/ceph.py index ddd95fd5f0574..24f28b578ad89 100644 --- a/tasks/ceph.py +++ b/tasks/ceph.py @@ -20,7 +20,6 @@ from teuthology.orchestra import run import ceph_client as cclient from teuthology.orchestra.daemon import DaemonGroup - CEPH_ROLE_TYPES = ['mon', 'osd', 'mds', 'rgw'] log = logging.getLogger(__name__) @@ -43,10 +42,10 @@ def ceph_log(ctx, config): 'chmod', '777', '/var/log/ceph', - ], + ], wait=False, - ) ) + ) log.info('Disabling ceph logrotate...') run.wait( ctx.cluster.run( @@ -54,10 +53,10 @@ def ceph_log(ctx, config): 'sudo', 'rm', '-f', '--', '/etc/logrotate.d/ceph', - ], + ], wait=False, - ) ) + ) log.info('Creating extra log directories...') run.wait( ctx.cluster.run( @@ -66,10 +65,10 @@ def ceph_log(ctx, config): 'install', '-d', '-m0755', '--', '/var/log/ceph/valgrind', '/var/log/ceph/profiling-logger', - ], + ], wait=False, - ) ) + ) try: yield @@ -96,10 +95,10 @@ def ceph_log(ctx, config): '--', 'gzip', '--', - ], + ], wait=False, - ), - ) + ), + ) log.info('Archiving logs...') path = os.path.join(ctx.archive, 'remote') @@ -121,6 +120,7 @@ def assign_devs(roles, devs): """ return dict(zip(roles, devs)) + @contextlib.contextmanager def valgrind_post(ctx, config): """ @@ -135,25 +135,25 @@ def valgrind_post(ctx, config): yield finally: lookup_procs = list() - log.info('Checking for errors in any valgrind logs...'); + log.info('Checking for errors in any valgrind logs...') for remote in ctx.cluster.remotes.iterkeys(): - #look at valgrind logs for each node + # look at valgrind logs for each node proc = remote.run( args=[ 'sudo', 'zgrep', '', run.Raw('/var/log/ceph/valgrind/*'), - '/dev/null', # include a second file so that we always get a filename prefix on the output + '/dev/null', # include a second file so that we always get a filename prefix on the output run.Raw('|'), 'sort', run.Raw('|'), 'uniq', - ], + ], wait=False, check_status=False, stdout=StringIO(), - ) + ) lookup_procs.append((proc, remote)) valgrind_exception = None @@ -177,6 +177,7 @@ def valgrind_post(ctx, config): if valgrind_exception is not None: raise valgrind_exception + @contextlib.contextmanager def crush_setup(ctx, config): first_mon = teuthology.get_first_mon(ctx, config) @@ -188,6 +189,7 @@ def crush_setup(ctx, config): args=['sudo', 'ceph', 'osd', 'crush', 'tunables', profile]) yield + @contextlib.contextmanager def cephfs_setup(ctx, config): testdir = teuthology.get_testdir(ctx) @@ -254,21 +256,20 @@ def cluster(ctx, config): args=[ 'install', '-d', '-m0755', '--', '{tdir}/data'.format(tdir=testdir), - ], + ], wait=False, - ) ) + ) run.wait( ctx.cluster.run( args=[ 'sudo', 'install', '-d', '-m0777', '--', '/var/run/ceph', - ], + ], wait=False, - ) ) - + ) devs_to_clean = {} remote_to_roles_to_devs = {} @@ -285,7 +286,7 @@ def cluster(ctx, config): iddevs = devs_id_map.values() roles_to_devs = assign_devs( teuthology.roles_of_type(roles_for_host, 'osd'), iddevs - ) + ) if len(roles_to_devs) < len(iddevs): iddevs = iddevs[len(roles_to_devs):] devs_to_clean[remote] = [] @@ -294,28 +295,28 @@ def cluster(ctx, config): log.info('block journal enabled') roles_to_journals = assign_devs( teuthology.roles_of_type(roles_for_host, 'osd'), iddevs - ) + ) log.info('journal map: %s', roles_to_journals) if config.get('tmpfs_journal'): log.info('tmpfs journal enabled') roles_to_journals = {} - remote.run( args=[ 'sudo', 'mount', '-t', 'tmpfs', 'tmpfs', '/mnt' ] ) + remote.run(args=['sudo', 'mount', '-t', 'tmpfs', 'tmpfs', '/mnt']) for osd in teuthology.roles_of_type(roles_for_host, 'osd'): tmpfs = '/mnt/osd.%s' % osd roles_to_journals[osd] = tmpfs - remote.run( args=[ 'truncate', '-s', '1500M', tmpfs ] ) + remote.run(args=['truncate', '-s', '1500M', tmpfs]) log.info('journal map: %s', roles_to_journals) log.info('dev map: %s' % (str(roles_to_devs),)) remote_to_roles_to_devs[remote] = roles_to_devs remote_to_roles_to_journals[remote] = roles_to_journals - log.info('Generating config...') remotes_and_roles = ctx.cluster.remotes.items() roles = [role_list for (remote, role_list) in remotes_and_roles] - ips = [host for (host, port) in (remote.ssh.get_transport().getpeername() for (remote, role_list) in remotes_and_roles)] + ips = [host for (host, port) in + (remote.ssh.get_transport().getpeername() for (remote, role_list) in remotes_and_roles)] conf = teuthology.skeleton_config(ctx, roles=roles, ips=ips) for remote, roles_to_journals in remote_to_roles_to_journals.iteritems(): for role, journal in roles_to_journals.iteritems(): @@ -352,8 +353,8 @@ def cluster(ctx, config): 'ceph-authtool', '--create-keyring', keyring_path, - ], - ) + ], + ) ctx.cluster.only(firstmon).run( args=[ 'sudo', @@ -364,22 +365,22 @@ def cluster(ctx, config): '--gen-key', '--name=mon.', keyring_path, - ], - ) + ], + ) ctx.cluster.only(firstmon).run( args=[ 'sudo', 'chmod', '0644', keyring_path, - ], - ) + ], + ) (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys() fsid = teuthology.create_simple_monmap( ctx, remote=mon0_remote, conf=conf, - ) + ) if not 'global' in conf: conf['global'] = {} conf['global']['fsid'] = fsid @@ -403,18 +404,18 @@ def cluster(ctx, config): '--cap', 'osd', 'allow *', '--cap', 'mds', 'allow *', keyring_path, - ], - ) + ], + ) log.info('Copying monmap to all nodes...') keyring = teuthology.get_file( remote=mon0_remote, path=keyring_path, - ) + ) monmap = teuthology.get_file( remote=mon0_remote, path='{tdir}/monmap'.format(tdir=testdir), - ) + ) for rem in ctx.cluster.remotes.iterkeys(): # copy mon key and initial monmap @@ -424,12 +425,12 @@ def cluster(ctx, config): path=keyring_path, data=keyring, perms='0644' - ) + ) teuthology.write_file( remote=rem, path='{tdir}/monmap'.format(tdir=testdir), data=monmap, - ) + ) log.info('Setting up mon nodes...') mons = ctx.cluster.only(teuthology.is_type('mon')) @@ -444,14 +445,14 @@ def cluster(ctx, config): '--clobber', '--createsimple', '{num:d}'.format( num=teuthology.num_instances_of_type(ctx.cluster, 'osd'), - ), + ), '{tdir}/osdmap'.format(tdir=testdir), '--pg_bits', '2', '--pgp_bits', '4', - ], + ], wait=False, - ), - ) + ), + ) log.info('Setting up mds nodes...') mdss = ctx.cluster.only(teuthology.is_type('mds')) @@ -473,8 +474,8 @@ def cluster(ctx, config): '--gen-key', '--name=mds.{id}'.format(id=id_), '/var/lib/ceph/mds/ceph-{id}/keyring'.format(id=id_), - ], - ) + ], + ) cclient.create_keyring(ctx) log.info('Running mkfs on osd nodes...') @@ -490,7 +491,6 @@ def cluster(ctx, config): roles_to_devs = remote_to_roles_to_devs[remote] roles_to_journals = remote_to_roles_to_journals[remote] - for id_ in teuthology.roles_of_type(roles_for_host, 'osd'): remote.run( args=[ @@ -498,7 +498,7 @@ def cluster(ctx, config): 'mkdir', '-p', '/var/lib/ceph/osd/ceph-{id}'.format(id=id_), - ]) + ]) log.info(str(roles_to_journals)) log.info(id_) if roles_to_devs.get(id_): @@ -508,22 +508,22 @@ def cluster(ctx, config): mkfs_options = config.get('mkfs_options') mount_options = config.get('mount_options') if fs == 'btrfs': - #package = 'btrfs-tools' + # package = 'btrfs-tools' if mount_options is None: - mount_options = ['noatime','user_subvol_rm_allowed'] + mount_options = ['noatime', 'user_subvol_rm_allowed'] if mkfs_options is None: mkfs_options = ['-m', 'single', '-l', '32768', '-n', '32768'] if fs == 'xfs': - #package = 'xfsprogs' + # package = 'xfsprogs' if mount_options is None: mount_options = ['noatime'] if mkfs_options is None: mkfs_options = ['-f', '-i', 'size=2048'] if fs == 'ext4' or fs == 'ext3': if mount_options is None: - mount_options = ['noatime','user_xattr'] + mount_options = ['noatime', 'user_xattr'] if mount_options is None: mount_options = [] @@ -536,19 +536,19 @@ def cluster(ctx, config): args=[ 'sudo', 'apt-get', 'install', '-y', package - ], + ], stdout=StringIO(), - ) + ) try: - remote.run(args= ['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev]) + remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev]) except run.CommandFailedError: # Newer btfs-tools doesn't prompt for overwrite, use -f if '-f' not in mount_options: mkfs_options.append('-f') mkfs = ['mkfs.%s' % fs] + mkfs_options log.info('%s on %s on %s' % (mkfs, dev, remote)) - remote.run(args= ['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev]) + remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev]) log.info('mount %s on %s -o %s' % (dev, remote, ','.join(mount_options))) @@ -560,8 +560,8 @@ def cluster(ctx, config): '-o', ','.join(mount_options), dev, os.path.join('/var/lib/ceph/osd', 'ceph-{id}'.format(id=id_)), - ] - ) + ] + ) if not remote in ctx.disk_config.remote_to_roles_to_dev_mount_options: ctx.disk_config.remote_to_roles_to_dev_mount_options[remote] = {} ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][id_] = mount_options @@ -571,8 +571,8 @@ def cluster(ctx, config): devs_to_clean[remote].append( os.path.join( os.path.join('/var/lib/ceph/osd', 'ceph-{id}'.format(id=id_)), - ) ) + ) for id_ in teuthology.roles_of_type(roles_for_host, 'osd'): remote.run( @@ -587,24 +587,23 @@ def cluster(ctx, config): '--mkkey', '-i', id_, '--monmap', '{tdir}/monmap'.format(tdir=testdir), - ], - ) - + ], + ) log.info('Reading keys from all nodes...') keys_fp = StringIO() keys = [] for remote, roles_for_host in ctx.cluster.remotes.iteritems(): - for type_ in ['mds','osd']: + for type_ in ['mds', 'osd']: for id_ in teuthology.roles_of_type(roles_for_host, type_): data = teuthology.get_file( remote=remote, path='/var/lib/ceph/{type}/ceph-{id}/keyring'.format( type=type_, id=id_, - ), + ), sudo=True, - ) + ) keys.append((type_, id_, data)) keys_fp.write(data) for remote, roles_for_host in ctx.cluster.remotes.iteritems(): @@ -613,7 +612,7 @@ def cluster(ctx, config): data = teuthology.get_file( remote=remote, path='/etc/ceph/ceph.client.{id}.keyring'.format(id=id_) - ) + ) keys.append((type_, id_, data)) keys_fp.write(data) @@ -622,11 +621,11 @@ def cluster(ctx, config): args=[ 'sudo', 'tee', '-a', keyring_path, - ], + ], stdin=run.PIPE, wait=False, stdout=StringIO(), - ) + ) keys_fp.seek(0) teuthology.feed_many_stdins_and_close(keys_fp, writes) run.wait(writes) @@ -634,32 +633,32 @@ def cluster(ctx, config): run.wait( mons.run( args=[ - 'sudo', - 'adjust-ulimits', - 'ceph-coverage', - coverage_dir, - 'ceph-authtool', - keyring_path, - '--name={type}.{id}'.format( - type=type_, - id=id_, - ), - ] + list(teuthology.generate_caps(type_)), + 'sudo', + 'adjust-ulimits', + 'ceph-coverage', + coverage_dir, + 'ceph-authtool', + keyring_path, + '--name={type}.{id}'.format( + type=type_, + id=id_, + ), + ] + list(teuthology.generate_caps(type_)), wait=False, - ), - ) + ), + ) log.info('Running mkfs on mon nodes...') for remote, roles_for_host in mons.remotes.iteritems(): for id_ in teuthology.roles_of_type(roles_for_host, 'mon'): remote.run( args=[ - 'sudo', - 'mkdir', - '-p', - '/var/lib/ceph/mon/ceph-{id}'.format(id=id_), - ], - ) + 'sudo', + 'mkdir', + '-p', + '/var/lib/ceph/mon/ceph-{id}'.format(id=id_), + ], + ) remote.run( args=[ 'sudo', @@ -672,9 +671,8 @@ def cluster(ctx, config): '--monmap={tdir}/monmap'.format(tdir=testdir), '--osdmap={tdir}/osdmap'.format(tdir=testdir), '--keyring={kpath}'.format(kpath=keyring_path), - ], - ) - + ], + ) run.wait( mons.run( @@ -683,10 +681,10 @@ def cluster(ctx, config): '--', '{tdir}/monmap'.format(tdir=testdir), '{tdir}/osdmap'.format(tdir=testdir), - ], + ], wait=False, - ), - ) + ), + ) try: yield @@ -698,6 +696,7 @@ def cluster(ctx, config): (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys() log.info('Checking cluster log for badness...') + def first_in_ceph_log(pattern, excludes): """ Find the first occurence of the pattern specified in the Ceph log, @@ -711,16 +710,16 @@ def cluster(ctx, config): 'sudo', 'egrep', pattern, '/var/log/ceph/ceph.log', - ] + ] for exclude in excludes: args.extend([run.Raw('|'), 'egrep', '-v', exclude]) args.extend([ - run.Raw('|'), 'head', '-n', '1', - ]) + run.Raw('|'), 'head', '-n', '1', + ]) r = mon0_remote.run( stdout=StringIO(), args=args, - ) + ) stdout = r.stdout.getvalue() if stdout != '': return stdout @@ -737,7 +736,7 @@ def cluster(ctx, config): if match is not None: ctx.summary['failure_reason'] = \ '"{match}" in cluster log'.format( - match=match.rstrip('\n'), + match=match.rstrip('\n'), ) break @@ -757,24 +756,24 @@ def cluster(ctx, config): ) except Exception as e: remote.run(args=[ - 'sudo', - run.Raw('PATH=/usr/sbin:$PATH'), - 'lsof', - run.Raw(';'), - 'ps', 'auxf', - ]) + 'sudo', + run.Raw('PATH=/usr/sbin:$PATH'), + 'lsof', + run.Raw(';'), + 'ps', 'auxf', + ]) raise e if config.get('tmpfs_journal'): log.info('tmpfs journal enabled - unmounting tmpfs at /mnt') for remote, roles_for_host in osds.remotes.iteritems(): remote.run( - args=[ 'sudo', 'umount', '-f', '/mnt' ], + args=['sudo', 'umount', '-f', '/mnt'], check_status=False, ) if ctx.archive is not None and \ - not (ctx.config.get('archive-on-error') and ctx.summary['success']): + not (ctx.config.get('archive-on-error') and ctx.summary['success']): # archive mon data, too log.info('Archiving mon data...') @@ -800,10 +799,11 @@ def cluster(ctx, config): keyring_path, '{tdir}/data'.format(tdir=testdir), '{tdir}/monmap'.format(tdir=testdir), - ], + ], wait=False, - ), - ) + ), + ) + def get_all_pg_info(rem_site, testdir): """ @@ -818,6 +818,7 @@ def get_all_pg_info(rem_site, testdir): all_info = json.loads(info.stdout.getvalue()) return all_info['pg_stats'] + def osd_scrub_pgs(ctx, config): """ Scrub pgs when we exit. @@ -835,7 +836,7 @@ def osd_scrub_pgs(ctx, config): rem_site = ctx.cluster.remotes.keys()[0] all_clean = False for _ in range(0, retries): - stats = get_all_pg_info(rem_site, testdir) + stats = get_all_pg_info(rem_site, testdir) states = [stat['state'] for stat in stats] if len(set(states)) == 1 and states[0] == 'active+clean': all_clean = True @@ -852,15 +853,15 @@ def osd_scrub_pgs(ctx, config): if role.startswith('osd.'): log.info("Scrubbing osd {osd}".format(osd=role)) rem_site.run(args=[ - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'ceph', 'osd', 'deep-scrub', role]) + 'adjust-ulimits', + 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=testdir), + 'ceph', 'osd', 'deep-scrub', role]) prev_good = 0 gap_cnt = 0 loop = True while loop: - stats = get_all_pg_info(rem_site, testdir) + stats = get_all_pg_info(rem_site, testdir) timez = [stat['last_scrub_stamp'] for stat in stats] loop = False thiscnt = 0 @@ -882,6 +883,7 @@ def osd_scrub_pgs(ctx, config): log.info('Still waiting for all pgs to be scrubbed.') time.sleep(delays) + @contextlib.contextmanager def run_daemon(ctx, config, type_): """ @@ -918,7 +920,7 @@ def run_daemon(ctx, config, type_): coverage_dir, 'daemon-helper', daemon_signal, - ] + ] run_cmd_tail = [ 'ceph-%s' % (type_), '-f', @@ -926,7 +928,7 @@ def run_daemon(ctx, config, type_): if type_ in config.get('cpu_profile', []): profile_path = '/var/log/ceph/profiling-logger/%s.%s.prof' % (type_, id_) - run_cmd.extend([ 'env', 'CPUPROFILE=%s' % profile_path ]) + run_cmd.extend(['env', 'CPUPROFILE=%s' % profile_path]) if config.get('valgrind') is not None: valgrind_args = None @@ -952,6 +954,7 @@ def run_daemon(ctx, config, type_): finally: teuthology.stop_daemons_of_type(ctx, type_) + def healthy(ctx, config): """ Wait for all osd's to be up, and for the ceph health monitor to return HEALTH_OK. @@ -966,11 +969,12 @@ def healthy(ctx, config): ctx, cluster=ctx.cluster, remote=mon0_remote - ) + ) teuthology.wait_until_healthy( ctx, remote=mon0_remote, - ) + ) + def wait_for_osds_up(ctx, config): """ @@ -986,7 +990,8 @@ def wait_for_osds_up(ctx, config): ctx, cluster=ctx.cluster, remote=mon0_remote - ) + ) + def wait_for_mon_quorum(ctx, config): """ @@ -1004,10 +1009,10 @@ def wait_for_mon_quorum(ctx, config): args=[ 'ceph', 'quorum_status', - ], + ], stdout=StringIO(), logger=log.getChild('quorum_status'), - ) + ) j = json.loads(r.stdout.getvalue()) q = j.get('quorum_names', []) log.debug('Quorum: %s', q) @@ -1024,8 +1029,8 @@ def created_pool(ctx, config): for new_pool in config: if new_pool not in ctx.manager.pools: ctx.manager.pools[new_pool] = ctx.manager.get_pool_property( - new_pool, 'pg_num') - + new_pool, 'pg_num') + @contextlib.contextmanager def restart(ctx, config): @@ -1099,6 +1104,7 @@ def stop(ctx, config): yield + @contextlib.contextmanager def wait_for_failure(ctx, config): """ @@ -1250,30 +1256,30 @@ def task(ctx, config): args=[ 'install', '-d', '-m0755', '--', coverage_dir, - ], + ], wait=False, - ) ) + ) with contextutil.nested( - lambda: ceph_log(ctx=ctx, config=None), - lambda: valgrind_post(ctx=ctx, config=config), - lambda: cluster(ctx=ctx, config=dict( + lambda: ceph_log(ctx=ctx, config=None), + lambda: valgrind_post(ctx=ctx, config=config), + lambda: cluster(ctx=ctx, config=dict( conf=config.get('conf', {}), fs=config.get('fs', None), mkfs_options=config.get('mkfs_options', None), - mount_options=config.get('mount_options',None), + mount_options=config.get('mount_options', None), block_journal=config.get('block_journal', None), tmpfs_journal=config.get('tmpfs_journal', None), log_whitelist=config.get('log-whitelist', []), cpu_profile=set(config.get('cpu_profile', [])), - )), - lambda: run_daemon(ctx=ctx, config=config, type_='mon'), - lambda: crush_setup(ctx=ctx, config=config), - lambda: run_daemon(ctx=ctx, config=config, type_='osd'), - lambda: cephfs_setup(ctx=ctx, config=config), - lambda: run_daemon(ctx=ctx, config=config, type_='mds'), - ): + )), + lambda: run_daemon(ctx=ctx, config=config, type_='mon'), + lambda: crush_setup(ctx=ctx, config=config), + lambda: run_daemon(ctx=ctx, config=config, type_='osd'), + lambda: cephfs_setup(ctx=ctx, config=config), + lambda: run_daemon(ctx=ctx, config=config, type_='mds'), + ): try: if config.get('wait-for-healthy', True): healthy(ctx=ctx, config=None) -- 2.39.5