From 8543d0ed4d7e02cc1cb97e894f6757de444bc2e3 Mon Sep 17 00:00:00 2001 From: Warren Usui Date: Tue, 3 Dec 2013 18:16:04 -0800 Subject: [PATCH] Added docstrings. Cleaned up code (broke up long lines, removed unused variable references, pep8 formatted most of the code (one set of long lines remains), and changed some variable and method names to conform to pylint standards). Fixes: 6530 --- teuthology/task/hadoop.py | 373 ++++++++++++++++++++++++-------------- 1 file changed, 241 insertions(+), 132 deletions(-) diff --git a/teuthology/task/hadoop.py b/teuthology/task/hadoop.py index a6575c7629..30e4c69523 100644 --- a/teuthology/task/hadoop.py +++ b/teuthology/task/hadoop.py @@ -1,5 +1,10 @@ -from cStringIO import StringIO +""" +Hadoop task +Install and cofigure hadoop -- requires that Ceph is already installed and +already running. +""" +from cStringIO import StringIO import contextlib import logging @@ -10,13 +15,12 @@ from ..orchestra import run log = logging.getLogger(__name__) -################### -# This installeds and configures Hadoop, but requires that Ceph is already installed and running. -################## -## Check that there is exactly one master and at least one slave configured @contextlib.contextmanager -def validate_config(ctx, config): +def validate_cluster(ctx): + """ + Check that there is exactly one master and at least one slave configured + """ log.info('Vaidating Hadoop configuration') slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave')) @@ -29,21 +33,27 @@ def validate_config(ctx, config): if (len(masters.remotes) == 1): pass else: - raise Exception("Exactly one hadoop.master must be specified. Currently there are " + str(len(masters.remotes))) + raise Exception( + "Exactly one hadoop.master must be specified. Currently there are " + + str(len(masters.remotes))) - try: + try: yield finally: pass -## Add required entries to conf/hadoop-env.sh -def write_hadoop_env(ctx, config): - hadoopEnvFile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format(tdir=teuthology.get_testdir(ctx)) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): - teuthology.write_file(remote, hadoopEnvFile, +def write_hadoop_env(ctx): + """ + Add required entries to conf/hadoop-env.sh + """ + hadoop_envfile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format( + tdir=teuthology.get_testdir(ctx)) + + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: + teuthology.write_file(remote, hadoop_envfile, '''export JAVA_HOME=/usr/lib/jvm/default-java export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/share/java/libcephfs.jar:{tdir}/apache_hadoop/build/hadoop-core*.jar:{tdir}/inktank_hadoop/build/hadoop-cephfs.jar export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS" @@ -51,25 +61,30 @@ export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SEC export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS" export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS" export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS" -'''.format(tdir=teuthology.get_testdir(ctx)) ) - log.info("wrote file: " + hadoopEnvFile + " to host: " + str(remote)) +'''.format(tdir=teuthology.get_testdir(ctx))) + log.info("wrote file: " + hadoop_envfile + " to host: " + str(remote)) + -## Add required entries to conf/core-site.xml def write_core_site(ctx, config): + """ + Add required entries to conf/core-site.xml + """ testdir = teuthology.get_testdir(ctx) - coreSiteFile = "{tdir}/apache_hadoop/conf/core-site.xml".format(tdir=testdir) + core_site_file = "{tdir}/apache_hadoop/conf/core-site.xml".format( + tdir=testdir) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: # check the config to see if we should use hdfs or ceph default_fs_string = "" if config.get('hdfs'): - default_fs_string = 'hdfs://{master_ip}:54310'.format(master_ip=get_hadoop_master_ip(ctx)) + default_fs_string = 'hdfs://{master_ip}:54310'.format( + master_ip=get_hadoop_master_ip(ctx)) else: default_fs_string = 'ceph:///' - teuthology.write_file(remote, coreSiteFile, + teuthology.write_file(remote, core_site_file, ''' @@ -93,25 +108,33 @@ def write_core_site(ctx, config): '''.format(tdir=teuthology.get_testdir(ctx), default_fs=default_fs_string)) - log.info("wrote file: " + coreSiteFile + " to host: " + str(remote)) + log.info("wrote file: " + core_site_file + " to host: " + str(remote)) + -## finds the hadoop.master in the ctx and then pulls out just the IP address def get_hadoop_master_ip(ctx): + """ + finds the hadoop.master in the ctx and then pulls out just the IP address + """ remote, _ = _get_master(ctx) - master_name,master_port= remote.ssh.get_transport().getpeername() - log.info('master name: {name} port {port}'.format(name=master_name,port=master_port)) + master_name, master_port = remote.ssh.get_transport().getpeername() + log.info('master name: {name} port {port}'.format(name=master_name, + port=master_port)) return master_name -## Add required entries to conf/mapred-site.xml + def write_mapred_site(ctx): - mapredSiteFile = "{tdir}/apache_hadoop/conf/mapred-site.xml".format(tdir=teuthology.get_testdir(ctx)) + """ + Add required entries to conf/mapred-site.xml + """ + mapred_site_file = "{tdir}/apache_hadoop/conf/mapred-site.xml".format( + tdir=teuthology.get_testdir(ctx)) master_ip = get_hadoop_master_ip(ctx) log.info('adding host {remote} as jobtracker'.format(remote=master_ip)) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): - teuthology.write_file(remote, mapredSiteFile, + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: + teuthology.write_file(remote, mapred_site_file, ''' @@ -120,18 +143,22 @@ def write_mapred_site(ctx): mapred.job.tracker {remote}:54311 - + '''.format(remote=master_ip)) - log.info("wrote file: " + mapredSiteFile + " to host: " + str(remote)) + log.info("wrote file: " + mapred_site_file + " to host: " + str(remote)) + -## Add required entries to conf/hdfs-site.xml def write_hdfs_site(ctx): - hdfsSiteFile = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format(tdir=teuthology.get_testdir(ctx)) + """ + Add required entries to conf/hdfs-site.xml + """ + hdfs_site_file = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format( + tdir=teuthology.get_testdir(ctx)) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): - teuthology.write_file(remote, hdfsSiteFile, + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: + teuthology.write_file(remote, hdfs_site_file, ''' @@ -141,47 +168,59 @@ def write_hdfs_site(ctx): 1 -''' ) - log.info("wrote file: " + hdfsSiteFile + " to host: " + str(remote)) +''') + log.info("wrote file: " + hdfs_site_file + " to host: " + str(remote)) + -## Add required entries to conf/slaves -## These nodes host TaskTrackers and DataNodes def write_slaves(ctx): + """ + Add required entries to conf/slaves + These nodes host TaskTrackers and DataNodes + """ log.info('Setting up slave nodes...') - slavesFile = "{tdir}/apache_hadoop/conf/slaves".format(tdir=teuthology.get_testdir(ctx)) - tmpFile = StringIO() + slaves_file = "{tdir}/apache_hadoop/conf/slaves".format( + tdir=teuthology.get_testdir(ctx)) + tmp_file = StringIO() slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave')) - for remote, roles_for_host in slaves.remotes.iteritems(): - tmpFile.write('{remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0])) + for remote in slaves.remotes: + tmp_file.write('{remote}\n'.format( + remote=remote.ssh.get_transport().getpeername()[0])) + + tmp_file.seek(0) - tmpFile.seek(0) + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: + teuthology.write_file(remote=remote, path=slaves_file, data=tmp_file) + tmp_file.seek(0) + log.info("wrote file: " + slaves_file + " to host: " + str(remote)) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): - teuthology.write_file(remote=remote, path=slavesFile, data=tmpFile) - tmpFile.seek(0) - log.info("wrote file: " + slavesFile + " to host: " + str(remote)) -## Add required entries to conf/masters -## These nodes host JobTrackers and Namenodes def write_master(ctx): - mastersFile = "{tdir}/apache_hadoop/conf/masters".format(tdir=teuthology.get_testdir(ctx)) + """ + Add required entries to conf/masters + These nodes host JobTrackers and Namenodes + """ + masters_file = "{tdir}/apache_hadoop/conf/masters".format( + tdir=teuthology.get_testdir(ctx)) master = _get_master(ctx) master_remote, _ = master + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) + for remote in hadoop_nodes.remotes: + teuthology.write_file(remote, masters_file, '{master_host}\n'.format( + master_host=master_remote.ssh.get_transport().getpeername()[0])) + log.info("wrote file: " + masters_file + " to host: " + str(remote)) - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) - for remote, roles_for_host in hadoopNodes.remotes.iteritems(): - teuthology.write_file(remote, mastersFile, '{master_host}\n'.format(master_host=master_remote.ssh.get_transport().getpeername()[0])) - log.info("wrote file: " + mastersFile + " to host: " + str(remote)) -## Call the various functions that configure Hadoop def _configure_hadoop(ctx, config): + """ + Call the various functions that configure Hadoop + """ log.info('writing out config files') - write_hadoop_env(ctx, config) + write_hadoop_env(ctx) write_core_site(ctx, config) write_mapred_site(ctx) write_hdfs_site(ctx) @@ -189,11 +228,13 @@ def _configure_hadoop(ctx, config): write_master(ctx) - @contextlib.contextmanager def configure_hadoop(ctx, config): - _configure_hadoop(ctx,config) - + """ + Call the various functions that configure Hadoop, and handle the + startup of hadoop and clean up of temporary files if this is an hdfs. + """ + _configure_hadoop(ctx, config) log.info('config.get(hdfs): {hdfs}'.format(hdfs=config.get('hdfs'))) if config.get('hdfs'): @@ -203,7 +244,8 @@ def configure_hadoop(ctx, config): master = _get_master(ctx) remote, _ = master remote.run( - args=["{tdir}/apache_hadoop/bin/hadoop".format(tdir=teuthology.get_testdir(ctx)), + args=["{tdir}/apache_hadoop/bin/hadoop".format( + tdir=teuthology.get_testdir(ctx)), "namenode", "-format"], wait=True, @@ -211,7 +253,7 @@ def configure_hadoop(ctx, config): log.info('done setting up hadoop') - try: + try: yield finally: @@ -227,23 +269,32 @@ def configure_hadoop(ctx, config): ), ) + def _start_hadoop(ctx, remote, config): + """ + remotely start hdfs if specified, and start mapred. + """ testdir = teuthology.get_testdir(ctx) if config.get('hdfs'): remote.run( - args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format(tdir=testdir), ], + args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format( + tdir=testdir), ], wait=True, ) log.info('done starting hdfs') remote.run( - args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format(tdir=testdir), ], + args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format( + tdir=testdir), ], wait=True, ) log.info('done starting mapred') def _stop_hadoop(ctx, remote, config): + """ + remotely stop mapred, and if hdfs if specified, stop the hdfs handler too. + """ testdir = teuthology.get_testdir(ctx) remote.run( args=['{tdir}/apache_hadoop/bin/stop-mapred.sh'.format(tdir=testdir), ], @@ -252,44 +303,63 @@ def _stop_hadoop(ctx, remote, config): if config.get('hdfs'): remote.run( - args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format(tdir=testdir), ], + args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format( + tdir=testdir), ], wait=True, ) log.info('done stopping hadoop') + def _get_master(ctx): + """ + Return the hadoop master. If more than one is found, fail an assertion + """ master = ctx.cluster.only(teuthology.is_type('hadoop.master')) - assert 1 == len(master.remotes.items()), 'There must be exactly 1 hadoop.master configured' + assert 1 == len(master.remotes.items()), \ + 'There must be exactly 1 hadoop.master configured' return master.remotes.items()[0] + @contextlib.contextmanager def start_hadoop(ctx, config): + """ + Handle the starting and stopping of hadoop + """ master = _get_master(ctx) remote, _ = master - log.info('Starting hadoop on {remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0])) + log.info('Starting hadoop on {remote}\n'.format( + remote=remote.ssh.get_transport().getpeername()[0])) _start_hadoop(ctx, remote, config) - try: + try: yield finally: - log.info('Running stop-mapred.sh on {remote}'.format(remote=remote.ssh.get_transport().getpeername()[0])) + log.info('Running stop-mapred.sh on {remote}'.format( + remote=remote.ssh.get_transport().getpeername()[0])) _stop_hadoop(ctx, remote, config) -# download and untar the most recent apache hadoop binaries into {testdir}/apache_hadoop -def _download_apache_hadoop_binaries(ctx, remote, hadoop_url): - log.info('_download_apache_hadoop_binaries: path {path} on host {host}'.format(path=hadoop_url, host=str(remote))) - fileName = 'apache-hadoop.tgz' + +def _download_apache_hadoop_bins(ctx, remote, hadoop_url): + """ + download and untar the most recent apache hadoop binaries into + {testdir}/apache_hadoop + """ + log.info( + '_download_apache_hadoop_bins: path {path} on host {host}'.format( + path=hadoop_url, host=str(remote))) + file_name = 'apache-hadoop.tgz' testdir = teuthology.get_testdir(ctx) remote.run( args=[ - 'mkdir', '-p', '-m0755', '{tdir}/apache_hadoop'.format(tdir=testdir), + 'mkdir', '-p', '-m0755', + '{tdir}/apache_hadoop'.format(tdir=testdir), run.Raw('&&'), 'echo', - '{fileName}'.format(fileName=fileName), + '{file_name}'.format(file_name=file_name), run.Raw('|'), 'wget', '-nv', @@ -298,21 +368,29 @@ def _download_apache_hadoop_binaries(ctx, remote, hadoop_url): # need to use --input-file to make wget respect --base '--input-file=-', run.Raw('|'), - 'tar', '-xzf', '-', '-C', '{tdir}/apache_hadoop'.format(tdir=testdir), + 'tar', '-xzf', '-', '-C', + '{tdir}/apache_hadoop'.format(tdir=testdir), ], ) -# download and untar the most recent Inktank hadoop binaries into {testdir}/hadoop -def _download_inktank_hadoop_binaries(ctx, remote, hadoop_url): - log.info('_download_inktank_hadoop_binaries: path {path} on host {host}'.format(path=hadoop_url, host=str(remote))) - fileName = 'hadoop.tgz' + +def _download_inktank_hadoop_bins(ctx, remote, hadoop_url): + """ + download and untar the most recent Inktank hadoop binaries into + {testdir}/hadoop + """ + log.info( + '_download_inktank_hadoop_bins: path {path} on host {host}'.format( + path=hadoop_url, host=str(remote))) + file_name = 'hadoop.tgz' testdir = teuthology.get_testdir(ctx) remote.run( args=[ - 'mkdir', '-p', '-m0755', '{tdir}/inktank_hadoop'.format(tdir=testdir), + 'mkdir', '-p', '-m0755', + '{tdir}/inktank_hadoop'.format(tdir=testdir), run.Raw('&&'), 'echo', - '{fileName}'.format(fileName=fileName), + '{file_name}'.format(file_name=file_name), run.Raw('|'), 'wget', '-nv', @@ -321,35 +399,54 @@ def _download_inktank_hadoop_binaries(ctx, remote, hadoop_url): # need to use --input-file to make wget respect --base '--input-file=-', run.Raw('|'), - 'tar', '-xzf', '-', '-C', '{tdir}/inktank_hadoop'.format(tdir=testdir), + 'tar', '-xzf', '-', '-C', + '{tdir}/inktank_hadoop'.format(tdir=testdir), ], ) -# copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop + def _copy_hadoop_cephfs_jars(ctx, remote, from_dir, to_dir): + """ + copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop + """ testdir = teuthology.get_testdir(ctx) - log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format(from_dir=from_dir, to_dir=to_dir, host=str(remote))) - file_names = [ 'hadoop-cephfs.jar', 'hadoop-cephfs-test.jar' ] + log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format( + from_dir=from_dir, to_dir=to_dir, host=str(remote))) + file_names = ['hadoop-cephfs.jar', 'hadoop-cephfs-test.jar'] for file_name in file_names: log.info('Copying file {file_name}'.format(file_name=file_name)) remote.run( - args=[ 'cp', '{tdir}/{from_dir}/{file_name}'.format(tdir=testdir,from_dir=from_dir,file_name=file_name), - '{tdir}/{to_dir}/'.format(tdir=testdir,to_dir=to_dir) + args=['cp', '{tdir}/{from_dir}/{file_name}'.format( + tdir=testdir, from_dir=from_dir, file_name=file_name), + '{tdir}/{to_dir}/'.format(tdir=testdir, to_dir=to_dir) ], ) -def _node_binaries(ctx, config, remote, inktank_hadoop_bindir_url, apache_hadoop_bindir_url): - _download_inktank_hadoop_binaries(ctx, remote, inktank_hadoop_bindir_url) - _download_apache_hadoop_binaries(ctx, remote, apache_hadoop_bindir_url) - _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build', 'apache_hadoop/build') + +def _node_binaries(ctx, remote, inktank_hadoop_bindir_url, + apache_hadoop_bindir_url): + """ + Download and copy over the appropriate binaries and jar files. + The calls from binaries() end up spawning this function on remote sites. + """ + _download_inktank_hadoop_bins(ctx, remote, inktank_hadoop_bindir_url) + _download_apache_hadoop_bins(ctx, remote, apache_hadoop_bindir_url) + _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build', + 'apache_hadoop/build') + @contextlib.contextmanager def binaries(ctx, config): + """ + Fetch the binaries from the gitbuilder, and spawn the download tasks on + the remote machines. + """ path = config.get('path') if path is None: # fetch Apache Hadoop from gitbuilder - log.info('Fetching and unpacking Apache Hadoop binaries from gitbuilder...') + log.info( + 'Fetching and unpacking Apache Hadoop binaries from gitbuilder...') apache_sha1, apache_hadoop_bindir_url = teuthology.get_ceph_binary_url( package='apache-hadoop', branch=config.get('apache_branch'), @@ -364,28 +461,32 @@ def binaries(ctx, config): ctx.summary['apache-hadoop-sha1'] = apache_sha1 # fetch Inktank Hadoop from gitbuilder - log.info('Fetching and unpacking Inktank Hadoop binaries from gitbuilder...') - inktank_sha1, inktank_hadoop_bindir_url = teuthology.get_ceph_binary_url( - package='hadoop', - branch=config.get('inktank_branch'), - tag=config.get('tag'), - sha1=config.get('sha1'), - flavor=config.get('flavor'), - format=config.get('format'), - dist=config.get('dist'), - arch=config.get('arch'), - ) + log.info( + 'Fetching and unpacking Inktank Hadoop binaries from gitbuilder...') + inktank_sha1, inktank_hadoop_bindir_url = \ + teuthology.get_ceph_binary_url( + package='hadoop', + branch=config.get('inktank_branch'), + tag=config.get('tag'), + sha1=config.get('sha1'), + flavor=config.get('flavor'), + format=config.get('format'), + dist=config.get('dist'), + arch=config.get('arch'), + ) log.info('inktank_hadoop_bindir_url %s' % (inktank_hadoop_bindir_url)) ctx.summary['inktank-hadoop-sha1'] = inktank_sha1 else: - raise Exception("The hadoop task does not support the path argument at present") + raise Exception( + "The hadoop task does not support the path argument at present") - with parallel() as p: - hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop')) + with parallel() as parallel_task: + hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop')) # these can happen independently - for remote in hadoopNodes.remotes.iterkeys(): - p.spawn(_node_binaries, ctx, config, remote, inktank_hadoop_bindir_url, apache_hadoop_bindir_url) + for remote in hadoop_nodes.remotes.iterkeys(): + parallel_task.spawn(_node_binaries, ctx, remote, + inktank_hadoop_bindir_url, apache_hadoop_bindir_url) try: yield @@ -393,29 +494,34 @@ def binaries(ctx, config): log.info('Removing hadoop binaries...') run.wait( ctx.cluster.run( - args=[ 'rm', '-rf', '--', '{tdir}/apache_hadoop'.format(tdir=teuthology.get_testdir(ctx))], + args=['rm', '-rf', '--', '{tdir}/apache_hadoop'.format( + tdir=teuthology.get_testdir(ctx))], wait=False, ), ) run.wait( ctx.cluster.run( - args=[ 'rm', '-rf', '--', '{tdir}/inktank_hadoop'.format(tdir=teuthology.get_testdir(ctx))], + args=['rm', '-rf', '--', '{tdir}/inktank_hadoop'.format( + tdir=teuthology.get_testdir(ctx))], wait=False, ), ) -## A Hadoop NameNode will stay in safe mode for 30 seconds by default. -## This method blocks until the NameNode is out of safe mode. + @contextlib.contextmanager def out_of_safemode(ctx, config): - + """ + A Hadoop NameNode will stay in safe mode for 30 seconds by default. + This method blocks until the NameNode is out of safe mode. + """ if config.get('hdfs'): log.info('Waiting for the Namenode to exit safe mode...') master = _get_master(ctx) remote, _ = master remote.run( - args=["{tdir}/apache_hadoop/bin/hadoop".format(tdir=teuthology.get_testdir(ctx)), + args=["{tdir}/apache_hadoop/bin/hadoop".format( + tdir=teuthology.get_testdir(ctx)), "dfsadmin", "-safemode", "wait"], @@ -429,6 +535,7 @@ def out_of_safemode(ctx, config): finally: pass + @contextlib.contextmanager def task(ctx, config): """ @@ -446,7 +553,7 @@ def task(ctx, config): - hadoop: Or if you want to use HDFS under Hadoop, this will configure Hadoop - for HDFS and start it along with MapReduce. Note that it does not + for HDFS and start it along with MapReduce. Note that it does not require Ceph be installed. roles: @@ -458,13 +565,13 @@ def task(ctx, config): - hadoop: hdfs: True - This task requires exactly one hadoop.master be specified + This task requires exactly one hadoop.master be specified and at least one hadoop.slave. - This does *not* do anything with the Hadoop setup. To run wordcount, + This does *not* do anything with the Hadoop setup. To run wordcount, you could use pexec like so (after the hadoop task): - - pexec: + - pexec: hadoop.slave.0: - mkdir -p /tmp/hadoop_input - wget http://ceph.com/qa/hadoop_input_files.tar -O /tmp/hadoop_input/files.tar @@ -473,15 +580,17 @@ def task(ctx, config): - {tdir}/hadoop/bin/hadoop fs -put /tmp/hadoop_input/*txt wordcount_input/ - {tdir}/hadoop/bin/hadoop jar {tdir}/hadoop/build/hadoop-example*jar wordcount wordcount_input wordcount_output - rm -rf /tmp/hadoop_input - """.format(tdir=teuthology.get_testdir(ctx)) + + Note: {tdir} in the above example is the teuthology test directory. + """ if config is None: config = {} assert isinstance(config, dict), \ "task hadoop only supports a dictionary for configuration" - dist = 'precise' - format = 'jar' + dist = 'precise' + format_type = 'jar' arch = 'x86_64' flavor = config.get('flavor', 'basic') @@ -494,27 +603,27 @@ def task(ctx, config): if config.get('apache_hadoop_branch') is not None: apache_branch = config.get('apache_hadoop_branch') else: - apache_branch = 'branch-1.0' # hadoop branch to acquire + apache_branch = 'branch-1.0' # hadoop branch to acquire inktank_branch = None if config.get('inktank_hadoop_branch') is not None: inktank_branch = config.get('inktank_hadoop_branch') else: - inktank_branch = 'cephfs/branch-1.0' # default branch name + inktank_branch = 'cephfs/branch-1.0' # default branch name # replace any '/' with a '_' to match the artifact paths - inktank_branch = inktank_branch.replace('/','_') - apache_branch = apache_branch.replace('/','_') + inktank_branch = inktank_branch.replace('/', '_') + apache_branch = apache_branch.replace('/', '_') with contextutil.nested( - lambda: validate_config(ctx=ctx, config=config), + lambda: validate_cluster(ctx=ctx), lambda: binaries(ctx=ctx, config=dict( tag=config.get('tag'), sha1=config.get('sha1'), path=config.get('path'), flavor=flavor, dist=config.get('dist', dist), - format=format, + format=format_type, arch=arch, apache_branch=apache_branch, inktank_branch=inktank_branch, -- 2.39.5