]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
hadoop: 2x
authorNoah Watkins <noahwatkins@gmail.com>
Mon, 19 Jan 2015 22:25:21 +0000 (14:25 -0800)
committerNoah Watkins <noahwatkins@gmail.com>
Wed, 11 Feb 2015 21:35:55 +0000 (13:35 -0800)
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
teuthology/task/hadoop.py

index 30e4c69523c0faba9049715cda52d1eee2615e25..f6460df6a9679d8a280860bf21ca945cefa82cc7 100644 (file)
@@ -1,13 +1,6 @@
-"""
-Hadoop task
-
-Install and cofigure hadoop -- requires that Ceph is already installed and
-already running.
-"""
 from cStringIO import StringIO
 import contextlib
 import logging
-
 from teuthology import misc as teuthology
 from teuthology import contextutil
 from teuthology.parallel import parallel
@@ -15,621 +8,377 @@ from ..orchestra import run
 
 log = logging.getLogger(__name__)
 
-
-@contextlib.contextmanager
-def validate_cluster(ctx):
-    """
-    Check that there is exactly one master and at least one slave configured
-    """
-    log.info('Vaidating Hadoop configuration')
-    slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
-
-    if (len(slaves.remotes) < 1):
-        raise Exception("At least one hadoop.slave must be specified")
-    else:
-        log.info(str(len(slaves.remotes)) + " slaves specified")
-
-    masters = ctx.cluster.only(teuthology.is_type('hadoop.master'))
-    if (len(masters.remotes) == 1):
-        pass
+HADOOP_2x_URL = "http://apache.osuosl.org/hadoop/common/hadoop-2.5.2/hadoop-2.5.2.tar.gz"
+
+def get_slaves_data(ctx):
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/slaves".format(tdir=tempdir)
+    nodes = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
+    hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+    data = '\n'.join(hosts)
+    return path, data
+
+def get_masters_data(ctx):
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/masters".format(tdir=tempdir)
+    nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+    data = '\n'.join(hosts)
+    return path, data
+
+def get_core_site_data(ctx, config):
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/core-site.xml".format(tdir=tempdir)
+    nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    host = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes][0]
+
+    if config.get('hdfs', False):
+        data_tmpl = """
+<configuration>
+     <property>
+         <name>fs.defaultFS</name>
+         <value>hdfs://{namenode}:9000</value>
+     </property>
+     <property>
+         <name>hadoop.tmp.dir</name>
+         <value>{tdir}/hadoop_tmp</value>
+     </property>
+</configuration>
+"""
     else:
-        raise Exception(
-           "Exactly one hadoop.master must be specified. Currently there are "
-           + str(len(masters.remotes)))
-
-    try:
-        yield
+        data_tmpl = """
+<configuration>
+  <property>
+    <name>fs.default.name</name>
+    <value>ceph://{namenode}:6789/</value>
+  </property>
+  <property>
+    <name>fs.defaultFS</name>
+    <value>ceph://{namenode}:6789/</value>
+  </property>
+  <property>
+    <name>ceph.conf.file</name>
+    <value>/etc/ceph/ceph.conf</value>
+  </property>
+  <property>
+    <name>ceph.mon.address</name>
+    <value>{namenode}:6789</value>
+  </property>
+  <property>
+    <name>ceph.auth.id</name>
+    <value>admin</value>
+  </property>
+  <property>
+    <name>ceph.data.pools</name>
+    <value>cephfs_data</value>
+  </property>
+  <property>
+    <name>fs.AbstractFileSystem.ceph.impl</name>
+    <value>org.apache.hadoop.fs.ceph.CephFs</value>
+  </property>
+  <property>
+    <name>fs.ceph.impl</name>
+    <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
+  </property>
+  <property>
+    <name>hadoop.tmp.dir</name>
+    <value>{tdir}/hadoop_tmp$</value>
+  </property>
+</configuration>
+"""
+    return path, data_tmpl.format(tdir=tempdir, namenode=host)
 
-    finally:
-        pass
-
-
-def write_hadoop_env(ctx):
-    """
-    Add required entries to conf/hadoop-env.sh
-    """
-    hadoop_envfile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format(
-            tdir=teuthology.get_testdir(ctx))
-
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-        teuthology.write_file(remote, hadoop_envfile,
-'''export JAVA_HOME=/usr/lib/jvm/default-java
-export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/share/java/libcephfs.jar:{tdir}/apache_hadoop/build/hadoop-core*.jar:{tdir}/inktank_hadoop/build/hadoop-cephfs.jar
-export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
-export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS"
-export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS"
-export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS"
-export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS"
-'''.format(tdir=teuthology.get_testdir(ctx)))
-        log.info("wrote file: " + hadoop_envfile + " to host: " + str(remote))
-
-
-def write_core_site(ctx, config):
-    """
-    Add required entries to conf/core-site.xml
-    """
-    testdir = teuthology.get_testdir(ctx)
-    core_site_file = "{tdir}/apache_hadoop/conf/core-site.xml".format(
-            tdir=testdir)
-
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-
-        # check the config to see if we should use hdfs or ceph
-        default_fs_string = ""
-        if config.get('hdfs'):
-            default_fs_string = 'hdfs://{master_ip}:54310'.format(
-                    master_ip=get_hadoop_master_ip(ctx))
-        else:
-            default_fs_string = 'ceph:///'
-
-        teuthology.write_file(remote, core_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file.  -->
+def get_mapred_site_data(ctx):
+    data_tmpl = """
 <configuration>
-    <property>
-        <name>hadoop.tmp.dir</name>
-        <value>/tmp/hadoop/tmp</value>
-    </property>
-    <property>
-        <name>fs.default.name</name>
-        <value>{default_fs}</value>
-    </property>
-    <property>
-      <name>ceph.conf.file</name>
-      <value>/etc/ceph/ceph.conf</value>
-    </property>
-    <property>
-      <name>fs.ceph.impl</name>
-      <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
+     <property>
+         <name>mapred.job.tracker</name>
+         <value>{namenode}:9001</value>
+     </property>
+     <property>
+        <name>mapreduce.framework.name</name>
+        <value>yarn</value>
     </property>
 </configuration>
-'''.format(tdir=teuthology.get_testdir(ctx), default_fs=default_fs_string))
-
-        log.info("wrote file: " + core_site_file + " to host: " + str(remote))
-
-
-def get_hadoop_master_ip(ctx):
-    """
-    finds the hadoop.master in the ctx and then pulls out just the IP address
-    """
-    remote, _ = _get_master(ctx)
-    master_name, master_port = remote.ssh.get_transport().getpeername()
-    log.info('master name: {name} port {port}'.format(name=master_name,
-            port=master_port))
-    return master_name
-
-
-def write_mapred_site(ctx):
-    """
-    Add required entries to conf/mapred-site.xml
-    """
-    mapred_site_file = "{tdir}/apache_hadoop/conf/mapred-site.xml".format(
-            tdir=teuthology.get_testdir(ctx))
-
-    master_ip = get_hadoop_master_ip(ctx)
-    log.info('adding host {remote} as jobtracker'.format(remote=master_ip))
-
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-        teuthology.write_file(remote, mapred_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
+"""
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/mapred-site.xml".format(tdir=tempdir)
+    nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+    assert len(hosts) == 1
+    host = hosts[0]
+    return path, data_tmpl.format(namenode=host)
+
+def get_yarn_site_data(ctx):
+    data_tmpl = """
+<configuration>
+<property>
+ <name>yarn.resourcemanager.resourcetracker.address</name>
+ <value>{namenode}:8025</value>  
+</property>
+<property>
+ <name>yarn.resourcemanager.scheduler.address</name>
+ <value>{namenode}:8030</value>  
+</property>
+<property>
+ <name>yarn.resourcemanager.address</name>
+ <value>{namenode}:8050</value>  
+</property>
+<property>
+ <name>yarn.resourcemanager.admin.address</name>
+ <value>{namenode}:8041</value>  
+</property>
+<property>
+  <name>yarn.resourcemanager.hostname</name>
+  <value>{namenode}</value>
+</property>
+<property>
+  <name>yarn.nodemanager.aux-services</name>
+  <value>mapreduce_shuffle</value>
+</property>
+</configuration>
+"""
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/yarn-site.xml".format(tdir=tempdir)
+    nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+    assert len(hosts) == 1
+    host = hosts[0]
+    return path, data_tmpl.format(namenode=host)
+
+def get_hdfs_site_data(ctx):
+    data = """
 <configuration>
-    <property>
-        <name>mapred.job.tracker</name>
-        <value>{remote}:54311</value>
-    </property>
+     <property>
+         <name>dfs.replication</name>
+         <value>1</value>
+     </property>
 </configuration>
-'''.format(remote=master_ip))
+"""
+    tempdir = teuthology.get_testdir(ctx)
+    path = "{tdir}/hadoop/etc/hadoop/hdfs-site.xml".format(tdir=tempdir)
+    return path, data
+
+def configure(ctx, config, hadoops, hadoop_dir):
+    tempdir = teuthology.get_testdir(ctx)
+
+    log.info("Writing Hadoop slaves file...")
+    for remote in hadoops.remotes:
+        path, data = get_slaves_data(ctx)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Writing Hadoop masters file...")
+    for remote in hadoops.remotes:
+        path, data = get_masters_data(ctx)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Writing Hadoop core-site.xml file...")
+    for remote in hadoops.remotes:
+        path, data = get_core_site_data(ctx, config)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Writing Hadoop yarn-site.xml file...")
+    for remote in hadoops.remotes:
+        path, data = get_yarn_site_data(ctx)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Writing Hadoop hdfs-site.xml file...")
+    for remote in hadoops.remotes:
+        path, data = get_hdfs_site_data(ctx)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Writing Hadoop mapred-site.xml file...")
+    for remote in hadoops.remotes:
+        path, data = get_mapred_site_data(ctx)
+        teuthology.write_file(remote, path, StringIO(data))
+
+    log.info("Setting JAVA_HOME in hadoop-env.sh...")
+    for remote in hadoops.remotes:
+        path = "{tdir}/hadoop/etc/hadoop/hadoop-env.sh".format(tdir=tempdir)
+        data = "JAVA_HOME=/usr/lib/jvm/default-java\n" # FIXME: RHEL?
+        teuthology.prepend_lines_to_file(remote, path, data)
 
-        log.info("wrote file: " + mapred_site_file + " to host: " + str(remote))
+@contextlib.contextmanager
+def install_hadoop(ctx, config):
+    testdir = teuthology.get_testdir(ctx)
 
+    log.info("Downloading Hadoop...")
+    hadoop_tarball = "{tdir}/hadoop.tar.gz".format(tdir=testdir)
+    hadoops = ctx.cluster.only(teuthology.is_type('hadoop'))
+    run.wait(
+        hadoops.run(
+            args = [
+                'wget',
+                '-nv',
+                '-O',
+                hadoop_tarball,
+                HADOOP_2x_URL
+            ],
+            wait = False,
+            )
+        )
 
-def write_hdfs_site(ctx):
-    """
-    Add required entries to conf/hdfs-site.xml
-    """
-    hdfs_site_file = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format(
-            tdir=teuthology.get_testdir(ctx))
+    log.info("Create directory for Hadoop install...")
+    hadoop_dir = "{tdir}/hadoop".format(tdir=testdir)
+    run.wait(
+        hadoops.run(
+            args = [
+                'mkdir',
+                hadoop_dir
+            ],
+            wait = False,
+            )
+        )
 
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-        teuthology.write_file(remote, hdfs_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
-<configuration>
-    <property>
-        <name>dfs.replication</name>
-        <value>1</value>
-    </property>
-</configuration>
-''')
-        log.info("wrote file: " + hdfs_site_file + " to host: " + str(remote))
-
-
-def write_slaves(ctx):
-    """
-    Add required entries to conf/slaves
-    These nodes host TaskTrackers and DataNodes
-    """
-    log.info('Setting up slave nodes...')
-
-    slaves_file = "{tdir}/apache_hadoop/conf/slaves".format(
-            tdir=teuthology.get_testdir(ctx))
-    tmp_file = StringIO()
-
-    slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
-    for remote in slaves.remotes:
-        tmp_file.write('{remote}\n'.format(
-                remote=remote.ssh.get_transport().getpeername()[0]))
-
-    tmp_file.seek(0)
-
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-        teuthology.write_file(remote=remote, path=slaves_file, data=tmp_file)
-        tmp_file.seek(0)
-        log.info("wrote file: " + slaves_file + " to host: " + str(remote))
-
-
-def write_master(ctx):
-    """
-    Add required entries to conf/masters
-    These nodes host JobTrackers and Namenodes
-    """
-    masters_file = "{tdir}/apache_hadoop/conf/masters".format(
-            tdir=teuthology.get_testdir(ctx))
-    master = _get_master(ctx)
-    master_remote, _ = master
-
-    hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-    for remote in hadoop_nodes.remotes:
-        teuthology.write_file(remote, masters_file, '{master_host}\n'.format(
-                master_host=master_remote.ssh.get_transport().getpeername()[0]))
-        log.info("wrote file: " + masters_file + " to host: " + str(remote))
-
-
-def _configure_hadoop(ctx, config):
-    """
-    Call the various functions that configure Hadoop
-    """
-    log.info('writing out config files')
-
-    write_hadoop_env(ctx)
-    write_core_site(ctx, config)
-    write_mapred_site(ctx)
-    write_hdfs_site(ctx)
-    write_slaves(ctx)
-    write_master(ctx)
+    log.info("Unpacking Hadoop...")
+    run.wait(
+        hadoops.run(
+            args = [
+                'tar',
+                'xzf',
+                hadoop_tarball,
+                '--strip-components=1',
+                '-C',
+                hadoop_dir
+            ],
+            wait = False,
+            )
+        )
 
+    log.info("Removing Hadoop download...")
+    run.wait(
+        hadoops.run(
+            args = [
+                'rm',
+                hadoop_tarball
+            ],
+            wait = False,
+            )
+        )
 
-@contextlib.contextmanager
-def configure_hadoop(ctx, config):
-    """
-    Call the various functions that configure Hadoop, and handle the
-    startup of hadoop and clean up of temporary files if this is an hdfs.
-    """
-    _configure_hadoop(ctx, config)
-    log.info('config.get(hdfs): {hdfs}'.format(hdfs=config.get('hdfs')))
-
-    if config.get('hdfs'):
-        log.info('hdfs option specified. Setting up hdfs')
-
-        # let's run this from the master
-        master = _get_master(ctx)
-        remote, _ = master
-        remote.run(
-        args=["{tdir}/apache_hadoop/bin/hadoop".format(
-                tdir=teuthology.get_testdir(ctx)),
-              "namenode",
-              "-format"],
-            wait=True,
+    log.info("Create Hadoop temporary directory...")
+    hadoop_tmp_dir = "{tdir}/hadoop_tmp".format(tdir=testdir)
+    run.wait(
+        hadoops.run(
+            args = [
+                'mkdir',
+                hadoop_tmp_dir
+            ],
+            wait = False,
+            )
         )
 
-    log.info('done setting up hadoop')
+    configure(ctx, config, hadoops, hadoop_dir)
 
     try:
         yield
-
     finally:
-        log.info('Removing hdfs directory')
         run.wait(
-            ctx.cluster.run(
-                args=[
+            hadoops.run(
+                args = [
                     'rm',
                     '-rf',
-                    '/tmp/hadoop',
-                    ],
-                wait=False,
-                ),
+                    hadoop_dir,
+                    hadoop_tmp_dir
+                ],
+                wait = False,
+                )
             )
 
-
-def _start_hadoop(ctx, remote, config):
-    """
-    remotely start hdfs if specified, and start mapred.
-    """
+@contextlib.contextmanager
+def start_hadoop(ctx, config):
     testdir = teuthology.get_testdir(ctx)
-    if config.get('hdfs'):
-        remote.run(
-            args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format(
-                    tdir=testdir), ],
-            wait=True,
+    hadoop_dir = "{tdir}/hadoop/".format(tdir=testdir)
+    masters = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    assert len(masters.remotes) == 1
+    master = masters.remotes.keys()[0]
+
+    log.info("Formatting HDFS...")
+    master.run(
+        args = [
+            hadoop_dir + "bin/hadoop",
+            "namenode",
+            "-format"
+        ],
+        wait = True,
         )
-        log.info('done starting hdfs')
-
-    remote.run(
-        args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format(
-                tdir=testdir), ],
-        wait=True,
-    )
-    log.info('done starting mapred')
-
 
-def _stop_hadoop(ctx, remote, config):
-    """
-    remotely stop mapred, and if hdfs if specified, stop the hdfs handler too.
-    """
-    testdir = teuthology.get_testdir(ctx)
-    remote.run(
-        args=['{tdir}/apache_hadoop/bin/stop-mapred.sh'.format(tdir=testdir), ],
-        wait=True,
-    )
-
-    if config.get('hdfs'):
-        remote.run(
-            args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format(
-                tdir=testdir), ],
-            wait=True,
+    log.info("Stopping Hadoop daemons")
+    master.run(
+        args = [
+            hadoop_dir + "sbin/stop-yarn.sh"
+        ],
+        wait = True,
         )
 
-    log.info('done stopping hadoop')
-
-
-def _get_master(ctx):
-    """
-    Return the hadoop master.  If more than one is found, fail an assertion
-    """
-    master = ctx.cluster.only(teuthology.is_type('hadoop.master'))
-    assert 1 == len(master.remotes.items()), \
-            'There must be exactly 1 hadoop.master configured'
-
-    return master.remotes.items()[0]
-
+    master.run(
+        args = [
+            hadoop_dir + "sbin/stop-dfs.sh"
+        ],
+        wait = True,
+        )
 
-@contextlib.contextmanager
-def start_hadoop(ctx, config):
-    """
-    Handle the starting and stopping of hadoop
-    """
-    master = _get_master(ctx)
-    remote, _ = master
+    log.info("Starting HDFS...")
+    master.run(
+        args = [
+            hadoop_dir + "sbin/start-dfs.sh"
+        ],
+        wait = True,
+        )
 
-    log.info('Starting hadoop on {remote}\n'.format(
-            remote=remote.ssh.get_transport().getpeername()[0]))
-    _start_hadoop(ctx, remote, config)
+    log.info("Starting YARN...")
+    master.run(
+        args = [
+            hadoop_dir + "sbin/start-yarn.sh"
+        ],
+        wait = True,
+        )
 
     try:
         yield
 
     finally:
-        log.info('Running stop-mapred.sh on {remote}'.format(
-                remote=remote.ssh.get_transport().getpeername()[0]))
-        _stop_hadoop(ctx, remote, config)
-
-
-def _download_apache_hadoop_bins(ctx, remote, hadoop_url):
-    """
-    download and untar the most recent apache hadoop binaries into
-    {testdir}/apache_hadoop
-    """
-    log.info(
-        '_download_apache_hadoop_bins: path {path} on host {host}'.format(
-        path=hadoop_url, host=str(remote)))
-    file_name = 'apache-hadoop.tgz'
-    testdir = teuthology.get_testdir(ctx)
-    remote.run(
-        args=[
-            'mkdir', '-p', '-m0755',
-            '{tdir}/apache_hadoop'.format(tdir=testdir),
-            run.Raw('&&'),
-            'echo',
-            '{file_name}'.format(file_name=file_name),
-            run.Raw('|'),
-            'wget',
-            '-nv',
-            '-O-',
-            '--base={url}'.format(url=hadoop_url),
-            # need to use --input-file to make wget respect --base
-            '--input-file=-',
-            run.Raw('|'),
-            'tar', '-xzf', '-', '-C',
-            '{tdir}/apache_hadoop'.format(tdir=testdir),
-        ],
-    )
-
-
-def _download_inktank_hadoop_bins(ctx, remote, hadoop_url):
-    """
-    download and untar the most recent Inktank hadoop binaries into
-    {testdir}/hadoop
-    """
-    log.info(
-        '_download_inktank_hadoop_bins: path {path} on host {host}'.format(
-            path=hadoop_url, host=str(remote)))
-    file_name = 'hadoop.tgz'
-    testdir = teuthology.get_testdir(ctx)
-    remote.run(
-        args=[
-            'mkdir', '-p', '-m0755',
-            '{tdir}/inktank_hadoop'.format(tdir=testdir),
-            run.Raw('&&'),
-            'echo',
-            '{file_name}'.format(file_name=file_name),
-            run.Raw('|'),
-            'wget',
-            '-nv',
-            '-O-',
-            '--base={url}'.format(url=hadoop_url),
-            # need to use --input-file to make wget respect --base
-            '--input-file=-',
-            run.Raw('|'),
-            'tar', '-xzf', '-', '-C',
-            '{tdir}/inktank_hadoop'.format(tdir=testdir),
-        ],
-    )
-
+        log.info("Stopping Hadoop daemons")
 
-def _copy_hadoop_cephfs_jars(ctx, remote, from_dir, to_dir):
-    """
-    copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop
-    """
-    testdir = teuthology.get_testdir(ctx)
-    log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format(
-            from_dir=from_dir, to_dir=to_dir, host=str(remote)))
-    file_names = ['hadoop-cephfs.jar', 'hadoop-cephfs-test.jar']
-    for file_name in file_names:
-        log.info('Copying file {file_name}'.format(file_name=file_name))
-        remote.run(
-            args=['cp', '{tdir}/{from_dir}/{file_name}'.format(
-                tdir=testdir, from_dir=from_dir, file_name=file_name),
-                '{tdir}/{to_dir}/'.format(tdir=testdir, to_dir=to_dir)
-            ],
-        )
-
-
-def _node_binaries(ctx, remote, inktank_hadoop_bindir_url,
-        apache_hadoop_bindir_url):
-    """
-    Download and copy over the appropriate binaries and jar files.
-    The calls from binaries() end up spawning this function on remote sites.
-    """
-    _download_inktank_hadoop_bins(ctx, remote, inktank_hadoop_bindir_url)
-    _download_apache_hadoop_bins(ctx, remote, apache_hadoop_bindir_url)
-    _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build',
-            'apache_hadoop/build')
-
-
-@contextlib.contextmanager
-def binaries(ctx, config):
-    """
-    Fetch the binaries from the gitbuilder, and spawn the download tasks on
-    the remote machines.
-    """
-    path = config.get('path')
-
-    if path is None:
-        # fetch Apache Hadoop from gitbuilder
-        log.info(
-            'Fetching and unpacking Apache Hadoop binaries from gitbuilder...')
-        apache_sha1, apache_hadoop_bindir_url = teuthology.get_ceph_binary_url(
-            package='apache-hadoop',
-            branch=config.get('apache_branch'),
-            tag=config.get('tag'),
-            sha1=config.get('sha1'),
-            flavor=config.get('flavor'),
-            format=config.get('format'),
-            dist=config.get('dist'),
-            arch=config.get('arch'),
+        master.run(
+            args = [
+                hadoop_dir + "sbin/stop-yarn.sh"
+                ],
+            wait = True,
             )
-        log.info('apache_hadoop_bindir_url %s' % (apache_hadoop_bindir_url))
-        ctx.summary['apache-hadoop-sha1'] = apache_sha1
-
-        # fetch Inktank Hadoop from gitbuilder
-        log.info(
-            'Fetching and unpacking Inktank Hadoop binaries from gitbuilder...')
-        inktank_sha1, inktank_hadoop_bindir_url = \
-            teuthology.get_ceph_binary_url(
-                package='hadoop',
-                branch=config.get('inktank_branch'),
-                tag=config.get('tag'),
-                sha1=config.get('sha1'),
-                flavor=config.get('flavor'),
-                format=config.get('format'),
-                dist=config.get('dist'),
-                arch=config.get('arch'),
-                )
-        log.info('inktank_hadoop_bindir_url %s' % (inktank_hadoop_bindir_url))
-        ctx.summary['inktank-hadoop-sha1'] = inktank_sha1
-
-    else:
-        raise Exception(
-                "The hadoop task does not support the path argument at present")
-
-    with parallel() as parallel_task:
-        hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
-        # these can happen independently
-        for remote in hadoop_nodes.remotes.iterkeys():
-            parallel_task.spawn(_node_binaries, ctx, remote,
-                    inktank_hadoop_bindir_url, apache_hadoop_bindir_url)
 
-    try:
-        yield
-    finally:
-        log.info('Removing hadoop binaries...')
-        run.wait(
-            ctx.cluster.run(
-                args=['rm', '-rf', '--', '{tdir}/apache_hadoop'.format(
-                        tdir=teuthology.get_testdir(ctx))],
-                wait=False,
-                ),
+        master.run(
+            args = [
+                hadoop_dir + "sbin/stop-dfs.sh"
+                ],
+            wait = True,
             )
+
         run.wait(
             ctx.cluster.run(
-                args=['rm', '-rf', '--', '{tdir}/inktank_hadoop'.format(
-                        tdir=teuthology.get_testdir(ctx))],
-                wait=False,
-                ),
+                args = [
+                    'sudo',
+                    'skill',
+                    '-9',
+                    'java'
+                    ],
+                wait = False
+                )
             )
 
-
-@contextlib.contextmanager
-def out_of_safemode(ctx, config):
-    """
-    A Hadoop NameNode will stay in safe mode for 30 seconds by default.
-    This method blocks until the NameNode is out of safe mode.
-    """
-    if config.get('hdfs'):
-        log.info('Waiting for the Namenode to exit safe mode...')
-
-        master = _get_master(ctx)
-        remote, _ = master
-        remote.run(
-            args=["{tdir}/apache_hadoop/bin/hadoop".format(
-                  tdir=teuthology.get_testdir(ctx)),
-                  "dfsadmin",
-                  "-safemode",
-                  "wait"],
-            wait=True,
-        )
-    else:
-        pass
-
-    try:
-        yield
-    finally:
-        pass
-
-
 @contextlib.contextmanager
 def task(ctx, config):
-    """
-    Set up and tear down a Hadoop cluster.
-
-    This depends on either having ceph installed prior to hadoop, like so:
-
-    roles:
-    - [mon.0, mds.0, osd.0, hadoop.master.0]
-    - [mon.1, osd.1, hadoop.slave.0]
-    - [mon.2, hadoop.slave.1]
-
-    tasks:
-    - ceph:
-    - hadoop:
-
-    Or if you want to use HDFS under Hadoop, this will configure Hadoop
-    for HDFS and start it along with MapReduce. Note that it does not
-    require Ceph be installed.
-
-    roles:
-    - [hadoop.master.0]
-    - [hadoop.slave.0]
-    - [hadoop.slave.1]
-
-    tasks:
-    - hadoop:
-        hdfs: True
-
-    This task requires exactly one hadoop.master be specified
-    and at least one hadoop.slave.
-
-    This does *not* do anything with the Hadoop setup. To run wordcount,
-    you could use pexec like so (after the hadoop task):
-
-    - pexec:
-        hadoop.slave.0:
-          - mkdir -p /tmp/hadoop_input
-          - wget http://ceph.com/qa/hadoop_input_files.tar -O /tmp/hadoop_input/files.tar
-          - cd /tmp/hadoop_input/; tar -xf /tmp/hadoop_input/files.tar
-          - {tdir}/hadoop/bin/hadoop fs -mkdir wordcount_input
-          - {tdir}/hadoop/bin/hadoop fs -put /tmp/hadoop_input/*txt wordcount_input/
-          - {tdir}/hadoop/bin/hadoop jar {tdir}/hadoop/build/hadoop-example*jar wordcount wordcount_input wordcount_output
-          - rm -rf /tmp/hadoop_input
-
-    Note: {tdir} in the above example is the teuthology test directory.
-    """
-
     if config is None:
         config = {}
-    assert isinstance(config, dict), \
-        "task hadoop only supports a dictionary for configuration"
-
-    dist = 'precise'
-    format_type = 'jar'
-    arch = 'x86_64'
-    flavor = config.get('flavor', 'basic')
-
-    ctx.summary['flavor'] = flavor
+    assert isinstance(config, dict), "task hadoop config must be dictionary"
 
     overrides = ctx.config.get('overrides', {})
     teuthology.deep_merge(config, overrides.get('hadoop', {}))
 
-    apache_branch = None
-    if config.get('apache_hadoop_branch') is not None:
-        apache_branch = config.get('apache_hadoop_branch')
-    else:
-        apache_branch = 'branch-1.0'  # hadoop branch to acquire
-
-    inktank_branch = None
-    if config.get('inktank_hadoop_branch') is not None:
-        inktank_branch = config.get('inktank_hadoop_branch')
-    else:
-        inktank_branch = 'cephfs/branch-1.0'  # default branch name
-
-    # replace any '/' with a '_' to match the artifact paths
-    inktank_branch = inktank_branch.replace('/', '_')
-    apache_branch = apache_branch.replace('/', '_')
-
-    with contextutil.nested(
-        lambda: validate_cluster(ctx=ctx),
-        lambda: binaries(ctx=ctx, config=dict(
-                tag=config.get('tag'),
-                sha1=config.get('sha1'),
-                path=config.get('path'),
-                flavor=flavor,
-                dist=config.get('dist', dist),
-                format=format_type,
-                arch=arch,
-                apache_branch=apache_branch,
-                inktank_branch=inktank_branch,
-                )),
-        lambda: configure_hadoop(ctx=ctx, config=config),
+    tasks = [
+        lambda: install_hadoop(ctx=ctx, config=config),
         lambda: start_hadoop(ctx=ctx, config=config),
-        lambda: out_of_safemode(ctx=ctx, config=config),
-        ):
+    ]
+
+    with contextutil.nested(*tasks):
         yield