]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Added docstrings. Cleaned up code (broke up long lines, removed unused
authorWarren Usui <warren.usui@inktank.com>
Wed, 4 Dec 2013 02:16:04 +0000 (18:16 -0800)
committerWarren Usui <warren.usui@inktank.com>
Wed, 4 Dec 2013 02:16:04 +0000 (18:16 -0800)
variable references, pep8 formatted most of the code (one set of long lines
remains), and changed some variable and method names to conform to pylint
standards).

Fixes: 6530
teuthology/task/hadoop.py

index a6575c7629104d3bb0180c8026e90bcef2dd0516..30e4c69523c0faba9049715cda52d1eee2615e25 100644 (file)
@@ -1,5 +1,10 @@
-from cStringIO import StringIO
+"""
+Hadoop task
 
+Install and cofigure hadoop -- requires that Ceph is already installed and
+already running.
+"""
+from cStringIO import StringIO
 import contextlib
 import logging
 
@@ -10,13 +15,12 @@ from ..orchestra import run
 
 log = logging.getLogger(__name__)
 
-###################
-# This installeds and configures Hadoop, but requires that Ceph is already installed and running.
-##################
 
-## Check that there is exactly one master and at least one slave configured
 @contextlib.contextmanager
-def validate_config(ctx, config):
+def validate_cluster(ctx):
+    """
+    Check that there is exactly one master and at least one slave configured
+    """
     log.info('Vaidating Hadoop configuration')
     slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
 
@@ -29,21 +33,27 @@ def validate_config(ctx, config):
     if (len(masters.remotes) == 1):
         pass
     else:
-        raise Exception("Exactly one hadoop.master must be specified. Currently there are " + str(len(masters.remotes)))
+        raise Exception(
+           "Exactly one hadoop.master must be specified. Currently there are "
+           + str(len(masters.remotes)))
 
-    try: 
+    try:
         yield
 
     finally:
         pass
 
-## Add required entries to conf/hadoop-env.sh
-def write_hadoop_env(ctx, config):
-    hadoopEnvFile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format(tdir=teuthology.get_testdir(ctx))
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
-        teuthology.write_file(remote, hadoopEnvFile, 
+def write_hadoop_env(ctx):
+    """
+    Add required entries to conf/hadoop-env.sh
+    """
+    hadoop_envfile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format(
+            tdir=teuthology.get_testdir(ctx))
+
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
+        teuthology.write_file(remote, hadoop_envfile,
 '''export JAVA_HOME=/usr/lib/jvm/default-java
 export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/share/java/libcephfs.jar:{tdir}/apache_hadoop/build/hadoop-core*.jar:{tdir}/inktank_hadoop/build/hadoop-cephfs.jar
 export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
@@ -51,25 +61,30 @@ export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SEC
 export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS"
 export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS"
 export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS"
-'''.format(tdir=teuthology.get_testdir(ctx))     )
-        log.info("wrote file: " + hadoopEnvFile + " to host: " + str(remote))
+'''.format(tdir=teuthology.get_testdir(ctx)))
+        log.info("wrote file: " + hadoop_envfile + " to host: " + str(remote))
+
 
-## Add required entries to conf/core-site.xml
 def write_core_site(ctx, config):
+    """
+    Add required entries to conf/core-site.xml
+    """
     testdir = teuthology.get_testdir(ctx)
-    coreSiteFile = "{tdir}/apache_hadoop/conf/core-site.xml".format(tdir=testdir)
+    core_site_file = "{tdir}/apache_hadoop/conf/core-site.xml".format(
+            tdir=testdir)
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
 
         # check the config to see if we should use hdfs or ceph
         default_fs_string = ""
         if config.get('hdfs'):
-            default_fs_string = 'hdfs://{master_ip}:54310'.format(master_ip=get_hadoop_master_ip(ctx))
+            default_fs_string = 'hdfs://{master_ip}:54310'.format(
+                    master_ip=get_hadoop_master_ip(ctx))
         else:
             default_fs_string = 'ceph:///'
 
-        teuthology.write_file(remote, coreSiteFile, 
+        teuthology.write_file(remote, core_site_file,
 '''<?xml version="1.0"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!-- Put site-specific property overrides in this file.  -->
@@ -93,25 +108,33 @@ def write_core_site(ctx, config):
 </configuration>
 '''.format(tdir=teuthology.get_testdir(ctx), default_fs=default_fs_string))
 
-        log.info("wrote file: " + coreSiteFile + " to host: " + str(remote))
+        log.info("wrote file: " + core_site_file + " to host: " + str(remote))
+
 
-## finds the hadoop.master in the ctx and then pulls out just the IP address
 def get_hadoop_master_ip(ctx):
+    """
+    finds the hadoop.master in the ctx and then pulls out just the IP address
+    """
     remote, _ = _get_master(ctx)
-    master_name,master_port= remote.ssh.get_transport().getpeername()
-    log.info('master name: {name} port {port}'.format(name=master_name,port=master_port))
+    master_name, master_port = remote.ssh.get_transport().getpeername()
+    log.info('master name: {name} port {port}'.format(name=master_name,
+            port=master_port))
     return master_name
 
-## Add required entries to conf/mapred-site.xml
+
 def write_mapred_site(ctx):
-    mapredSiteFile = "{tdir}/apache_hadoop/conf/mapred-site.xml".format(tdir=teuthology.get_testdir(ctx))
+    """
+    Add required entries to conf/mapred-site.xml
+    """
+    mapred_site_file = "{tdir}/apache_hadoop/conf/mapred-site.xml".format(
+            tdir=teuthology.get_testdir(ctx))
 
     master_ip = get_hadoop_master_ip(ctx)
     log.info('adding host {remote} as jobtracker'.format(remote=master_ip))
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
-        teuthology.write_file(remote, mapredSiteFile, 
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
+        teuthology.write_file(remote, mapred_site_file,
 '''<?xml version="1.0"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!-- Put site-specific property overrides in this file. -->
@@ -120,18 +143,22 @@ def write_mapred_site(ctx):
         <name>mapred.job.tracker</name>
         <value>{remote}:54311</value>
     </property>
-</configuration>  
+</configuration>
 '''.format(remote=master_ip))
 
-        log.info("wrote file: " + mapredSiteFile + " to host: " + str(remote))
+        log.info("wrote file: " + mapred_site_file + " to host: " + str(remote))
+
 
-## Add required entries to conf/hdfs-site.xml
 def write_hdfs_site(ctx):
-    hdfsSiteFile = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format(tdir=teuthology.get_testdir(ctx))
+    """
+    Add required entries to conf/hdfs-site.xml
+    """
+    hdfs_site_file = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format(
+            tdir=teuthology.get_testdir(ctx))
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
-        teuthology.write_file(remote, hdfsSiteFile, 
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
+        teuthology.write_file(remote, hdfs_site_file,
 '''<?xml version="1.0"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!-- Put site-specific property overrides in this file. -->
@@ -141,47 +168,59 @@ def write_hdfs_site(ctx):
         <value>1</value>
     </property>
 </configuration>
-'''     )
-        log.info("wrote file: " + hdfsSiteFile + " to host: " + str(remote))
+''')
+        log.info("wrote file: " + hdfs_site_file + " to host: " + str(remote))
+
 
-## Add required entries to conf/slaves 
-## These nodes host TaskTrackers and DataNodes
 def write_slaves(ctx):
+    """
+    Add required entries to conf/slaves
+    These nodes host TaskTrackers and DataNodes
+    """
     log.info('Setting up slave nodes...')
 
-    slavesFile = "{tdir}/apache_hadoop/conf/slaves".format(tdir=teuthology.get_testdir(ctx))
-    tmpFile = StringIO()
+    slaves_file = "{tdir}/apache_hadoop/conf/slaves".format(
+            tdir=teuthology.get_testdir(ctx))
+    tmp_file = StringIO()
 
     slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
-    for remote, roles_for_host in slaves.remotes.iteritems():
-        tmpFile.write('{remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+    for remote in slaves.remotes:
+        tmp_file.write('{remote}\n'.format(
+                remote=remote.ssh.get_transport().getpeername()[0]))
+
+    tmp_file.seek(0)
 
-    tmpFile.seek(0)
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
+        teuthology.write_file(remote=remote, path=slaves_file, data=tmp_file)
+        tmp_file.seek(0)
+        log.info("wrote file: " + slaves_file + " to host: " + str(remote))
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
-        teuthology.write_file(remote=remote, path=slavesFile, data=tmpFile)
-        tmpFile.seek(0)
-        log.info("wrote file: " + slavesFile + " to host: " + str(remote))
 
-## Add required entries to conf/masters 
-## These nodes host JobTrackers and Namenodes
 def write_master(ctx):
-    mastersFile = "{tdir}/apache_hadoop/conf/masters".format(tdir=teuthology.get_testdir(ctx))
+    """
+    Add required entries to conf/masters
+    These nodes host JobTrackers and Namenodes
+    """
+    masters_file = "{tdir}/apache_hadoop/conf/masters".format(
+            tdir=teuthology.get_testdir(ctx))
     master = _get_master(ctx)
     master_remote, _ = master
 
+    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote in hadoop_nodes.remotes:
+        teuthology.write_file(remote, masters_file, '{master_host}\n'.format(
+                master_host=master_remote.ssh.get_transport().getpeername()[0]))
+        log.info("wrote file: " + masters_file + " to host: " + str(remote))
 
-    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
-        teuthology.write_file(remote, mastersFile, '{master_host}\n'.format(master_host=master_remote.ssh.get_transport().getpeername()[0]))
-        log.info("wrote file: " + mastersFile + " to host: " + str(remote))
 
-## Call the various functions that configure Hadoop
 def _configure_hadoop(ctx, config):
+    """
+    Call the various functions that configure Hadoop
+    """
     log.info('writing out config files')
 
-    write_hadoop_env(ctx, config)
+    write_hadoop_env(ctx)
     write_core_site(ctx, config)
     write_mapred_site(ctx)
     write_hdfs_site(ctx)
@@ -189,11 +228,13 @@ def _configure_hadoop(ctx, config):
     write_master(ctx)
 
 
-
 @contextlib.contextmanager
 def configure_hadoop(ctx, config):
-    _configure_hadoop(ctx,config)
-
+    """
+    Call the various functions that configure Hadoop, and handle the
+    startup of hadoop and clean up of temporary files if this is an hdfs.
+    """
+    _configure_hadoop(ctx, config)
     log.info('config.get(hdfs): {hdfs}'.format(hdfs=config.get('hdfs')))
 
     if config.get('hdfs'):
@@ -203,7 +244,8 @@ def configure_hadoop(ctx, config):
         master = _get_master(ctx)
         remote, _ = master
         remote.run(
-        args=["{tdir}/apache_hadoop/bin/hadoop".format(tdir=teuthology.get_testdir(ctx)),
+        args=["{tdir}/apache_hadoop/bin/hadoop".format(
+                tdir=teuthology.get_testdir(ctx)),
               "namenode",
               "-format"],
             wait=True,
@@ -211,7 +253,7 @@ def configure_hadoop(ctx, config):
 
     log.info('done setting up hadoop')
 
-    try: 
+    try:
         yield
 
     finally:
@@ -227,23 +269,32 @@ def configure_hadoop(ctx, config):
                 ),
             )
 
+
 def _start_hadoop(ctx, remote, config):
+    """
+    remotely start hdfs if specified, and start mapred.
+    """
     testdir = teuthology.get_testdir(ctx)
     if config.get('hdfs'):
         remote.run(
-            args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format(tdir=testdir), ],
+            args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format(
+                    tdir=testdir), ],
             wait=True,
         )
         log.info('done starting hdfs')
 
     remote.run(
-        args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format(tdir=testdir), ],
+        args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format(
+                tdir=testdir), ],
         wait=True,
     )
     log.info('done starting mapred')
 
 
 def _stop_hadoop(ctx, remote, config):
+    """
+    remotely stop mapred, and if hdfs if specified, stop the hdfs handler too.
+    """
     testdir = teuthology.get_testdir(ctx)
     remote.run(
         args=['{tdir}/apache_hadoop/bin/stop-mapred.sh'.format(tdir=testdir), ],
@@ -252,44 +303,63 @@ def _stop_hadoop(ctx, remote, config):
 
     if config.get('hdfs'):
         remote.run(
-            args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format(tdir=testdir), ],
+            args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format(
+                tdir=testdir), ],
             wait=True,
         )
 
     log.info('done stopping hadoop')
 
+
 def _get_master(ctx):
+    """
+    Return the hadoop master.  If more than one is found, fail an assertion
+    """
     master = ctx.cluster.only(teuthology.is_type('hadoop.master'))
-    assert 1 == len(master.remotes.items()), 'There must be exactly 1 hadoop.master configured'
+    assert 1 == len(master.remotes.items()), \
+            'There must be exactly 1 hadoop.master configured'
 
     return master.remotes.items()[0]
 
+
 @contextlib.contextmanager
 def start_hadoop(ctx, config):
+    """
+    Handle the starting and stopping of hadoop
+    """
     master = _get_master(ctx)
     remote, _ = master
 
-    log.info('Starting hadoop on {remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+    log.info('Starting hadoop on {remote}\n'.format(
+            remote=remote.ssh.get_transport().getpeername()[0]))
     _start_hadoop(ctx, remote, config)
 
-    try: 
+    try:
         yield
 
     finally:
-        log.info('Running stop-mapred.sh on {remote}'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+        log.info('Running stop-mapred.sh on {remote}'.format(
+                remote=remote.ssh.get_transport().getpeername()[0]))
         _stop_hadoop(ctx, remote, config)
 
-# download and untar the most recent apache hadoop binaries into {testdir}/apache_hadoop
-def _download_apache_hadoop_binaries(ctx, remote, hadoop_url):
-    log.info('_download_apache_hadoop_binaries: path {path} on host {host}'.format(path=hadoop_url, host=str(remote)))
-    fileName = 'apache-hadoop.tgz'
+
+def _download_apache_hadoop_bins(ctx, remote, hadoop_url):
+    """
+    download and untar the most recent apache hadoop binaries into
+    {testdir}/apache_hadoop
+    """
+    log.info(
+        '_download_apache_hadoop_bins: path {path} on host {host}'.format(
+        path=hadoop_url, host=str(remote)))
+    file_name = 'apache-hadoop.tgz'
     testdir = teuthology.get_testdir(ctx)
     remote.run(
         args=[
-            'mkdir', '-p', '-m0755', '{tdir}/apache_hadoop'.format(tdir=testdir),
+            'mkdir', '-p', '-m0755',
+            '{tdir}/apache_hadoop'.format(tdir=testdir),
             run.Raw('&&'),
             'echo',
-            '{fileName}'.format(fileName=fileName),
+            '{file_name}'.format(file_name=file_name),
             run.Raw('|'),
             'wget',
             '-nv',
@@ -298,21 +368,29 @@ def _download_apache_hadoop_binaries(ctx, remote, hadoop_url):
             # need to use --input-file to make wget respect --base
             '--input-file=-',
             run.Raw('|'),
-            'tar', '-xzf', '-', '-C', '{tdir}/apache_hadoop'.format(tdir=testdir),
+            'tar', '-xzf', '-', '-C',
+            '{tdir}/apache_hadoop'.format(tdir=testdir),
         ],
     )
 
-# download and untar the most recent Inktank hadoop binaries into {testdir}/hadoop
-def _download_inktank_hadoop_binaries(ctx, remote, hadoop_url):
-    log.info('_download_inktank_hadoop_binaries: path {path} on host {host}'.format(path=hadoop_url, host=str(remote)))
-    fileName = 'hadoop.tgz'
+
+def _download_inktank_hadoop_bins(ctx, remote, hadoop_url):
+    """
+    download and untar the most recent Inktank hadoop binaries into
+    {testdir}/hadoop
+    """
+    log.info(
+        '_download_inktank_hadoop_bins: path {path} on host {host}'.format(
+            path=hadoop_url, host=str(remote)))
+    file_name = 'hadoop.tgz'
     testdir = teuthology.get_testdir(ctx)
     remote.run(
         args=[
-            'mkdir', '-p', '-m0755', '{tdir}/inktank_hadoop'.format(tdir=testdir),
+            'mkdir', '-p', '-m0755',
+            '{tdir}/inktank_hadoop'.format(tdir=testdir),
             run.Raw('&&'),
             'echo',
-            '{fileName}'.format(fileName=fileName),
+            '{file_name}'.format(file_name=file_name),
             run.Raw('|'),
             'wget',
             '-nv',
@@ -321,35 +399,54 @@ def _download_inktank_hadoop_binaries(ctx, remote, hadoop_url):
             # need to use --input-file to make wget respect --base
             '--input-file=-',
             run.Raw('|'),
-            'tar', '-xzf', '-', '-C', '{tdir}/inktank_hadoop'.format(tdir=testdir),
+            'tar', '-xzf', '-', '-C',
+            '{tdir}/inktank_hadoop'.format(tdir=testdir),
         ],
     )
 
-# copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop
+
 def _copy_hadoop_cephfs_jars(ctx, remote, from_dir, to_dir):
+    """
+    copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop
+    """
     testdir = teuthology.get_testdir(ctx)
-    log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format(from_dir=from_dir, to_dir=to_dir, host=str(remote)))
-    file_names = [ 'hadoop-cephfs.jar', 'hadoop-cephfs-test.jar' ] 
+    log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format(
+            from_dir=from_dir, to_dir=to_dir, host=str(remote)))
+    file_names = ['hadoop-cephfs.jar', 'hadoop-cephfs-test.jar']
     for file_name in file_names:
         log.info('Copying file {file_name}'.format(file_name=file_name))
         remote.run(
-            args=[ 'cp', '{tdir}/{from_dir}/{file_name}'.format(tdir=testdir,from_dir=from_dir,file_name=file_name),
-                '{tdir}/{to_dir}/'.format(tdir=testdir,to_dir=to_dir)
+            args=['cp', '{tdir}/{from_dir}/{file_name}'.format(
+                tdir=testdir, from_dir=from_dir, file_name=file_name),
+                '{tdir}/{to_dir}/'.format(tdir=testdir, to_dir=to_dir)
             ],
         )
 
-def _node_binaries(ctx, config, remote, inktank_hadoop_bindir_url, apache_hadoop_bindir_url):
-    _download_inktank_hadoop_binaries(ctx, remote, inktank_hadoop_bindir_url)
-    _download_apache_hadoop_binaries(ctx, remote, apache_hadoop_bindir_url)
-    _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build', 'apache_hadoop/build')
+
+def _node_binaries(ctx, remote, inktank_hadoop_bindir_url,
+        apache_hadoop_bindir_url):
+    """
+    Download and copy over the appropriate binaries and jar files.
+    The calls from binaries() end up spawning this function on remote sites.
+    """
+    _download_inktank_hadoop_bins(ctx, remote, inktank_hadoop_bindir_url)
+    _download_apache_hadoop_bins(ctx, remote, apache_hadoop_bindir_url)
+    _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build',
+            'apache_hadoop/build')
+
 
 @contextlib.contextmanager
 def binaries(ctx, config):
+    """
+    Fetch the binaries from the gitbuilder, and spawn the download tasks on
+    the remote machines.
+    """
     path = config.get('path')
 
     if path is None:
         # fetch Apache Hadoop from gitbuilder
-        log.info('Fetching and unpacking Apache Hadoop binaries from gitbuilder...')
+        log.info(
+            'Fetching and unpacking Apache Hadoop binaries from gitbuilder...')
         apache_sha1, apache_hadoop_bindir_url = teuthology.get_ceph_binary_url(
             package='apache-hadoop',
             branch=config.get('apache_branch'),
@@ -364,28 +461,32 @@ def binaries(ctx, config):
         ctx.summary['apache-hadoop-sha1'] = apache_sha1
 
         # fetch Inktank Hadoop from gitbuilder
-        log.info('Fetching and unpacking Inktank Hadoop binaries from gitbuilder...')
-        inktank_sha1, inktank_hadoop_bindir_url = teuthology.get_ceph_binary_url(
-            package='hadoop',
-            branch=config.get('inktank_branch'),
-            tag=config.get('tag'),
-            sha1=config.get('sha1'),
-            flavor=config.get('flavor'),
-            format=config.get('format'),
-            dist=config.get('dist'),
-            arch=config.get('arch'),
-            )
+        log.info(
+            'Fetching and unpacking Inktank Hadoop binaries from gitbuilder...')
+        inktank_sha1, inktank_hadoop_bindir_url = \
+            teuthology.get_ceph_binary_url(
+                package='hadoop',
+                branch=config.get('inktank_branch'),
+                tag=config.get('tag'),
+                sha1=config.get('sha1'),
+                flavor=config.get('flavor'),
+                format=config.get('format'),
+                dist=config.get('dist'),
+                arch=config.get('arch'),
+                )
         log.info('inktank_hadoop_bindir_url %s' % (inktank_hadoop_bindir_url))
         ctx.summary['inktank-hadoop-sha1'] = inktank_sha1
 
     else:
-        raise Exception("The hadoop task does not support the path argument at present")
+        raise Exception(
+                "The hadoop task does not support the path argument at present")
 
-    with parallel() as p:
-        hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    with parallel() as parallel_task:
+        hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
         # these can happen independently
-        for remote in hadoopNodes.remotes.iterkeys():
-            p.spawn(_node_binaries, ctx, config, remote, inktank_hadoop_bindir_url, apache_hadoop_bindir_url)
+        for remote in hadoop_nodes.remotes.iterkeys():
+            parallel_task.spawn(_node_binaries, ctx, remote,
+                    inktank_hadoop_bindir_url, apache_hadoop_bindir_url)
 
     try:
         yield
@@ -393,29 +494,34 @@ def binaries(ctx, config):
         log.info('Removing hadoop binaries...')
         run.wait(
             ctx.cluster.run(
-                args=[ 'rm', '-rf', '--', '{tdir}/apache_hadoop'.format(tdir=teuthology.get_testdir(ctx))],
+                args=['rm', '-rf', '--', '{tdir}/apache_hadoop'.format(
+                        tdir=teuthology.get_testdir(ctx))],
                 wait=False,
                 ),
             )
         run.wait(
             ctx.cluster.run(
-                args=[ 'rm', '-rf', '--', '{tdir}/inktank_hadoop'.format(tdir=teuthology.get_testdir(ctx))],
+                args=['rm', '-rf', '--', '{tdir}/inktank_hadoop'.format(
+                        tdir=teuthology.get_testdir(ctx))],
                 wait=False,
                 ),
             )
 
-## A Hadoop NameNode will stay in safe mode for 30 seconds by default.
-## This method blocks until the NameNode is out of safe mode.
+
 @contextlib.contextmanager
 def out_of_safemode(ctx, config):
-
+    """
+    A Hadoop NameNode will stay in safe mode for 30 seconds by default.
+    This method blocks until the NameNode is out of safe mode.
+    """
     if config.get('hdfs'):
         log.info('Waiting for the Namenode to exit safe mode...')
 
         master = _get_master(ctx)
         remote, _ = master
         remote.run(
-            args=["{tdir}/apache_hadoop/bin/hadoop".format(tdir=teuthology.get_testdir(ctx)),
+            args=["{tdir}/apache_hadoop/bin/hadoop".format(
+                  tdir=teuthology.get_testdir(ctx)),
                   "dfsadmin",
                   "-safemode",
                   "wait"],
@@ -429,6 +535,7 @@ def out_of_safemode(ctx, config):
     finally:
         pass
 
+
 @contextlib.contextmanager
 def task(ctx, config):
     """
@@ -446,7 +553,7 @@ def task(ctx, config):
     - hadoop:
 
     Or if you want to use HDFS under Hadoop, this will configure Hadoop
-    for HDFS and start it along with MapReduce. Note that it does not 
+    for HDFS and start it along with MapReduce. Note that it does not
     require Ceph be installed.
 
     roles:
@@ -458,13 +565,13 @@ def task(ctx, config):
     - hadoop:
         hdfs: True
 
-    This task requires exactly one hadoop.master be specified 
+    This task requires exactly one hadoop.master be specified
     and at least one hadoop.slave.
 
-    This does *not* do anything with the Hadoop setup. To run wordcount, 
+    This does *not* do anything with the Hadoop setup. To run wordcount,
     you could use pexec like so (after the hadoop task):
 
-    - pexec: 
+    - pexec:
         hadoop.slave.0:
           - mkdir -p /tmp/hadoop_input
           - wget http://ceph.com/qa/hadoop_input_files.tar -O /tmp/hadoop_input/files.tar
@@ -473,15 +580,17 @@ def task(ctx, config):
           - {tdir}/hadoop/bin/hadoop fs -put /tmp/hadoop_input/*txt wordcount_input/
           - {tdir}/hadoop/bin/hadoop jar {tdir}/hadoop/build/hadoop-example*jar wordcount wordcount_input wordcount_output
           - rm -rf /tmp/hadoop_input
-    """.format(tdir=teuthology.get_testdir(ctx))
+
+    Note: {tdir} in the above example is the teuthology test directory.
+    """
 
     if config is None:
         config = {}
     assert isinstance(config, dict), \
         "task hadoop only supports a dictionary for configuration"
 
-    dist = 'precise' 
-    format = 'jar'
+    dist = 'precise'
+    format_type = 'jar'
     arch = 'x86_64'
     flavor = config.get('flavor', 'basic')
 
@@ -494,27 +603,27 @@ def task(ctx, config):
     if config.get('apache_hadoop_branch') is not None:
         apache_branch = config.get('apache_hadoop_branch')
     else:
-        apache_branch = 'branch-1.0' # hadoop branch to acquire
+        apache_branch = 'branch-1.0'  # hadoop branch to acquire
 
     inktank_branch = None
     if config.get('inktank_hadoop_branch') is not None:
         inktank_branch = config.get('inktank_hadoop_branch')
     else:
-        inktank_branch = 'cephfs/branch-1.0' # default branch name
+        inktank_branch = 'cephfs/branch-1.0'  # default branch name
 
     # replace any '/' with a '_' to match the artifact paths
-    inktank_branch = inktank_branch.replace('/','_')
-    apache_branch = apache_branch.replace('/','_')
+    inktank_branch = inktank_branch.replace('/', '_')
+    apache_branch = apache_branch.replace('/', '_')
 
     with contextutil.nested(
-        lambda: validate_config(ctx=ctx, config=config),
+        lambda: validate_cluster(ctx=ctx),
         lambda: binaries(ctx=ctx, config=dict(
                 tag=config.get('tag'),
                 sha1=config.get('sha1'),
                 path=config.get('path'),
                 flavor=flavor,
                 dist=config.get('dist', dist),
-                format=format,
+                format=format_type,
                 arch=arch,
                 apache_branch=apache_branch,
                 inktank_branch=inktank_branch,