-"""
-Hadoop task
-
-Install and cofigure hadoop -- requires that Ceph is already installed and
-already running.
-"""
from cStringIO import StringIO
import contextlib
import logging
-
from teuthology import misc as teuthology
from teuthology import contextutil
from teuthology.parallel import parallel
log = logging.getLogger(__name__)
-
-@contextlib.contextmanager
-def validate_cluster(ctx):
- """
- Check that there is exactly one master and at least one slave configured
- """
- log.info('Vaidating Hadoop configuration')
- slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
-
- if (len(slaves.remotes) < 1):
- raise Exception("At least one hadoop.slave must be specified")
- else:
- log.info(str(len(slaves.remotes)) + " slaves specified")
-
- masters = ctx.cluster.only(teuthology.is_type('hadoop.master'))
- if (len(masters.remotes) == 1):
- pass
+HADOOP_2x_URL = "http://apache.osuosl.org/hadoop/common/hadoop-2.5.2/hadoop-2.5.2.tar.gz"
+
+def get_slaves_data(ctx):
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/slaves".format(tdir=tempdir)
+ nodes = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
+ hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+ data = '\n'.join(hosts)
+ return path, data
+
+def get_masters_data(ctx):
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/masters".format(tdir=tempdir)
+ nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+ hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+ data = '\n'.join(hosts)
+ return path, data
+
+def get_core_site_data(ctx, config):
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/core-site.xml".format(tdir=tempdir)
+ nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+ host = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes][0]
+
+ if config.get('hdfs', False):
+ data_tmpl = """
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://{namenode}:9000</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>{tdir}/hadoop_tmp</value>
+ </property>
+</configuration>
+"""
else:
- raise Exception(
- "Exactly one hadoop.master must be specified. Currently there are "
- + str(len(masters.remotes)))
-
- try:
- yield
+ data_tmpl = """
+<configuration>
+ <property>
+ <name>fs.default.name</name>
+ <value>ceph://{namenode}:6789/</value>
+ </property>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>ceph://{namenode}:6789/</value>
+ </property>
+ <property>
+ <name>ceph.conf.file</name>
+ <value>/etc/ceph/ceph.conf</value>
+ </property>
+ <property>
+ <name>ceph.mon.address</name>
+ <value>{namenode}:6789</value>
+ </property>
+ <property>
+ <name>ceph.auth.id</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>ceph.data.pools</name>
+ <value>cephfs_data</value>
+ </property>
+ <property>
+ <name>fs.AbstractFileSystem.ceph.impl</name>
+ <value>org.apache.hadoop.fs.ceph.CephFs</value>
+ </property>
+ <property>
+ <name>fs.ceph.impl</name>
+ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>{tdir}/hadoop_tmp$</value>
+ </property>
+</configuration>
+"""
+ return path, data_tmpl.format(tdir=tempdir, namenode=host)
- finally:
- pass
-
-
-def write_hadoop_env(ctx):
- """
- Add required entries to conf/hadoop-env.sh
- """
- hadoop_envfile = "{tdir}/apache_hadoop/conf/hadoop-env.sh".format(
- tdir=teuthology.get_testdir(ctx))
-
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
- teuthology.write_file(remote, hadoop_envfile,
-'''export JAVA_HOME=/usr/lib/jvm/default-java
-export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/share/java/libcephfs.jar:{tdir}/apache_hadoop/build/hadoop-core*.jar:{tdir}/inktank_hadoop/build/hadoop-cephfs.jar
-export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
-export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS"
-export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS"
-export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS"
-export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS"
-'''.format(tdir=teuthology.get_testdir(ctx)))
- log.info("wrote file: " + hadoop_envfile + " to host: " + str(remote))
-
-
-def write_core_site(ctx, config):
- """
- Add required entries to conf/core-site.xml
- """
- testdir = teuthology.get_testdir(ctx)
- core_site_file = "{tdir}/apache_hadoop/conf/core-site.xml".format(
- tdir=testdir)
-
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
-
- # check the config to see if we should use hdfs or ceph
- default_fs_string = ""
- if config.get('hdfs'):
- default_fs_string = 'hdfs://{master_ip}:54310'.format(
- master_ip=get_hadoop_master_ip(ctx))
- else:
- default_fs_string = 'ceph:///'
-
- teuthology.write_file(remote, core_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
+def get_mapred_site_data(ctx):
+ data_tmpl = """
<configuration>
- <property>
- <name>hadoop.tmp.dir</name>
- <value>/tmp/hadoop/tmp</value>
- </property>
- <property>
- <name>fs.default.name</name>
- <value>{default_fs}</value>
- </property>
- <property>
- <name>ceph.conf.file</name>
- <value>/etc/ceph/ceph.conf</value>
- </property>
- <property>
- <name>fs.ceph.impl</name>
- <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>{namenode}:9001</value>
+ </property>
+ <property>
+ <name>mapreduce.framework.name</name>
+ <value>yarn</value>
</property>
</configuration>
-'''.format(tdir=teuthology.get_testdir(ctx), default_fs=default_fs_string))
-
- log.info("wrote file: " + core_site_file + " to host: " + str(remote))
-
-
-def get_hadoop_master_ip(ctx):
- """
- finds the hadoop.master in the ctx and then pulls out just the IP address
- """
- remote, _ = _get_master(ctx)
- master_name, master_port = remote.ssh.get_transport().getpeername()
- log.info('master name: {name} port {port}'.format(name=master_name,
- port=master_port))
- return master_name
-
-
-def write_mapred_site(ctx):
- """
- Add required entries to conf/mapred-site.xml
- """
- mapred_site_file = "{tdir}/apache_hadoop/conf/mapred-site.xml".format(
- tdir=teuthology.get_testdir(ctx))
-
- master_ip = get_hadoop_master_ip(ctx)
- log.info('adding host {remote} as jobtracker'.format(remote=master_ip))
-
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
- teuthology.write_file(remote, mapred_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
+"""
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/mapred-site.xml".format(tdir=tempdir)
+ nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+ hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+ assert len(hosts) == 1
+ host = hosts[0]
+ return path, data_tmpl.format(namenode=host)
+
+def get_yarn_site_data(ctx):
+ data_tmpl = """
+<configuration>
+<property>
+ <name>yarn.resourcemanager.resourcetracker.address</name>
+ <value>{namenode}:8025</value>
+</property>
+<property>
+ <name>yarn.resourcemanager.scheduler.address</name>
+ <value>{namenode}:8030</value>
+</property>
+<property>
+ <name>yarn.resourcemanager.address</name>
+ <value>{namenode}:8050</value>
+</property>
+<property>
+ <name>yarn.resourcemanager.admin.address</name>
+ <value>{namenode}:8041</value>
+</property>
+<property>
+ <name>yarn.resourcemanager.hostname</name>
+ <value>{namenode}</value>
+</property>
+<property>
+ <name>yarn.nodemanager.aux-services</name>
+ <value>mapreduce_shuffle</value>
+</property>
+</configuration>
+"""
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/yarn-site.xml".format(tdir=tempdir)
+ nodes = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+ hosts = [s.ssh.get_transport().getpeername()[0] for s in nodes.remotes]
+ assert len(hosts) == 1
+ host = hosts[0]
+ return path, data_tmpl.format(namenode=host)
+
+def get_hdfs_site_data(ctx):
+ data = """
<configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>{remote}:54311</value>
- </property>
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
</configuration>
-'''.format(remote=master_ip))
+"""
+ tempdir = teuthology.get_testdir(ctx)
+ path = "{tdir}/hadoop/etc/hadoop/hdfs-site.xml".format(tdir=tempdir)
+ return path, data
+
+def configure(ctx, config, hadoops, hadoop_dir):
+ tempdir = teuthology.get_testdir(ctx)
+
+ log.info("Writing Hadoop slaves file...")
+ for remote in hadoops.remotes:
+ path, data = get_slaves_data(ctx)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Writing Hadoop masters file...")
+ for remote in hadoops.remotes:
+ path, data = get_masters_data(ctx)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Writing Hadoop core-site.xml file...")
+ for remote in hadoops.remotes:
+ path, data = get_core_site_data(ctx, config)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Writing Hadoop yarn-site.xml file...")
+ for remote in hadoops.remotes:
+ path, data = get_yarn_site_data(ctx)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Writing Hadoop hdfs-site.xml file...")
+ for remote in hadoops.remotes:
+ path, data = get_hdfs_site_data(ctx)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Writing Hadoop mapred-site.xml file...")
+ for remote in hadoops.remotes:
+ path, data = get_mapred_site_data(ctx)
+ teuthology.write_file(remote, path, StringIO(data))
+
+ log.info("Setting JAVA_HOME in hadoop-env.sh...")
+ for remote in hadoops.remotes:
+ path = "{tdir}/hadoop/etc/hadoop/hadoop-env.sh".format(tdir=tempdir)
+ data = "JAVA_HOME=/usr/lib/jvm/default-java\n" # FIXME: RHEL?
+ teuthology.prepend_lines_to_file(remote, path, data)
- log.info("wrote file: " + mapred_site_file + " to host: " + str(remote))
+@contextlib.contextmanager
+def install_hadoop(ctx, config):
+ testdir = teuthology.get_testdir(ctx)
+ log.info("Downloading Hadoop...")
+ hadoop_tarball = "{tdir}/hadoop.tar.gz".format(tdir=testdir)
+ hadoops = ctx.cluster.only(teuthology.is_type('hadoop'))
+ run.wait(
+ hadoops.run(
+ args = [
+ 'wget',
+ '-nv',
+ '-O',
+ hadoop_tarball,
+ HADOOP_2x_URL
+ ],
+ wait = False,
+ )
+ )
-def write_hdfs_site(ctx):
- """
- Add required entries to conf/hdfs-site.xml
- """
- hdfs_site_file = "{tdir}/apache_hadoop/conf/hdfs-site.xml".format(
- tdir=teuthology.get_testdir(ctx))
+ log.info("Create directory for Hadoop install...")
+ hadoop_dir = "{tdir}/hadoop".format(tdir=testdir)
+ run.wait(
+ hadoops.run(
+ args = [
+ 'mkdir',
+ hadoop_dir
+ ],
+ wait = False,
+ )
+ )
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
- teuthology.write_file(remote, hdfs_site_file,
-'''<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
-<configuration>
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
-</configuration>
-''')
- log.info("wrote file: " + hdfs_site_file + " to host: " + str(remote))
-
-
-def write_slaves(ctx):
- """
- Add required entries to conf/slaves
- These nodes host TaskTrackers and DataNodes
- """
- log.info('Setting up slave nodes...')
-
- slaves_file = "{tdir}/apache_hadoop/conf/slaves".format(
- tdir=teuthology.get_testdir(ctx))
- tmp_file = StringIO()
-
- slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
- for remote in slaves.remotes:
- tmp_file.write('{remote}\n'.format(
- remote=remote.ssh.get_transport().getpeername()[0]))
-
- tmp_file.seek(0)
-
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
- teuthology.write_file(remote=remote, path=slaves_file, data=tmp_file)
- tmp_file.seek(0)
- log.info("wrote file: " + slaves_file + " to host: " + str(remote))
-
-
-def write_master(ctx):
- """
- Add required entries to conf/masters
- These nodes host JobTrackers and Namenodes
- """
- masters_file = "{tdir}/apache_hadoop/conf/masters".format(
- tdir=teuthology.get_testdir(ctx))
- master = _get_master(ctx)
- master_remote, _ = master
-
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- for remote in hadoop_nodes.remotes:
- teuthology.write_file(remote, masters_file, '{master_host}\n'.format(
- master_host=master_remote.ssh.get_transport().getpeername()[0]))
- log.info("wrote file: " + masters_file + " to host: " + str(remote))
-
-
-def _configure_hadoop(ctx, config):
- """
- Call the various functions that configure Hadoop
- """
- log.info('writing out config files')
-
- write_hadoop_env(ctx)
- write_core_site(ctx, config)
- write_mapred_site(ctx)
- write_hdfs_site(ctx)
- write_slaves(ctx)
- write_master(ctx)
+ log.info("Unpacking Hadoop...")
+ run.wait(
+ hadoops.run(
+ args = [
+ 'tar',
+ 'xzf',
+ hadoop_tarball,
+ '--strip-components=1',
+ '-C',
+ hadoop_dir
+ ],
+ wait = False,
+ )
+ )
+ log.info("Removing Hadoop download...")
+ run.wait(
+ hadoops.run(
+ args = [
+ 'rm',
+ hadoop_tarball
+ ],
+ wait = False,
+ )
+ )
-@contextlib.contextmanager
-def configure_hadoop(ctx, config):
- """
- Call the various functions that configure Hadoop, and handle the
- startup of hadoop and clean up of temporary files if this is an hdfs.
- """
- _configure_hadoop(ctx, config)
- log.info('config.get(hdfs): {hdfs}'.format(hdfs=config.get('hdfs')))
-
- if config.get('hdfs'):
- log.info('hdfs option specified. Setting up hdfs')
-
- # let's run this from the master
- master = _get_master(ctx)
- remote, _ = master
- remote.run(
- args=["{tdir}/apache_hadoop/bin/hadoop".format(
- tdir=teuthology.get_testdir(ctx)),
- "namenode",
- "-format"],
- wait=True,
+ log.info("Create Hadoop temporary directory...")
+ hadoop_tmp_dir = "{tdir}/hadoop_tmp".format(tdir=testdir)
+ run.wait(
+ hadoops.run(
+ args = [
+ 'mkdir',
+ hadoop_tmp_dir
+ ],
+ wait = False,
+ )
)
- log.info('done setting up hadoop')
+ configure(ctx, config, hadoops, hadoop_dir)
try:
yield
-
finally:
- log.info('Removing hdfs directory')
run.wait(
- ctx.cluster.run(
- args=[
+ hadoops.run(
+ args = [
'rm',
'-rf',
- '/tmp/hadoop',
- ],
- wait=False,
- ),
+ hadoop_dir,
+ hadoop_tmp_dir
+ ],
+ wait = False,
+ )
)
-
-def _start_hadoop(ctx, remote, config):
- """
- remotely start hdfs if specified, and start mapred.
- """
+@contextlib.contextmanager
+def start_hadoop(ctx, config):
testdir = teuthology.get_testdir(ctx)
- if config.get('hdfs'):
- remote.run(
- args=['{tdir}/apache_hadoop/bin/start-dfs.sh'.format(
- tdir=testdir), ],
- wait=True,
+ hadoop_dir = "{tdir}/hadoop/".format(tdir=testdir)
+ masters = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+ assert len(masters.remotes) == 1
+ master = masters.remotes.keys()[0]
+
+ log.info("Formatting HDFS...")
+ master.run(
+ args = [
+ hadoop_dir + "bin/hadoop",
+ "namenode",
+ "-format"
+ ],
+ wait = True,
)
- log.info('done starting hdfs')
-
- remote.run(
- args=['{tdir}/apache_hadoop/bin/start-mapred.sh'.format(
- tdir=testdir), ],
- wait=True,
- )
- log.info('done starting mapred')
-
-def _stop_hadoop(ctx, remote, config):
- """
- remotely stop mapred, and if hdfs if specified, stop the hdfs handler too.
- """
- testdir = teuthology.get_testdir(ctx)
- remote.run(
- args=['{tdir}/apache_hadoop/bin/stop-mapred.sh'.format(tdir=testdir), ],
- wait=True,
- )
-
- if config.get('hdfs'):
- remote.run(
- args=['{tdir}/apache_hadoop/bin/stop-dfs.sh'.format(
- tdir=testdir), ],
- wait=True,
+ log.info("Stopping Hadoop daemons")
+ master.run(
+ args = [
+ hadoop_dir + "sbin/stop-yarn.sh"
+ ],
+ wait = True,
)
- log.info('done stopping hadoop')
-
-
-def _get_master(ctx):
- """
- Return the hadoop master. If more than one is found, fail an assertion
- """
- master = ctx.cluster.only(teuthology.is_type('hadoop.master'))
- assert 1 == len(master.remotes.items()), \
- 'There must be exactly 1 hadoop.master configured'
-
- return master.remotes.items()[0]
-
+ master.run(
+ args = [
+ hadoop_dir + "sbin/stop-dfs.sh"
+ ],
+ wait = True,
+ )
-@contextlib.contextmanager
-def start_hadoop(ctx, config):
- """
- Handle the starting and stopping of hadoop
- """
- master = _get_master(ctx)
- remote, _ = master
+ log.info("Starting HDFS...")
+ master.run(
+ args = [
+ hadoop_dir + "sbin/start-dfs.sh"
+ ],
+ wait = True,
+ )
- log.info('Starting hadoop on {remote}\n'.format(
- remote=remote.ssh.get_transport().getpeername()[0]))
- _start_hadoop(ctx, remote, config)
+ log.info("Starting YARN...")
+ master.run(
+ args = [
+ hadoop_dir + "sbin/start-yarn.sh"
+ ],
+ wait = True,
+ )
try:
yield
finally:
- log.info('Running stop-mapred.sh on {remote}'.format(
- remote=remote.ssh.get_transport().getpeername()[0]))
- _stop_hadoop(ctx, remote, config)
-
-
-def _download_apache_hadoop_bins(ctx, remote, hadoop_url):
- """
- download and untar the most recent apache hadoop binaries into
- {testdir}/apache_hadoop
- """
- log.info(
- '_download_apache_hadoop_bins: path {path} on host {host}'.format(
- path=hadoop_url, host=str(remote)))
- file_name = 'apache-hadoop.tgz'
- testdir = teuthology.get_testdir(ctx)
- remote.run(
- args=[
- 'mkdir', '-p', '-m0755',
- '{tdir}/apache_hadoop'.format(tdir=testdir),
- run.Raw('&&'),
- 'echo',
- '{file_name}'.format(file_name=file_name),
- run.Raw('|'),
- 'wget',
- '-nv',
- '-O-',
- '--base={url}'.format(url=hadoop_url),
- # need to use --input-file to make wget respect --base
- '--input-file=-',
- run.Raw('|'),
- 'tar', '-xzf', '-', '-C',
- '{tdir}/apache_hadoop'.format(tdir=testdir),
- ],
- )
-
-
-def _download_inktank_hadoop_bins(ctx, remote, hadoop_url):
- """
- download and untar the most recent Inktank hadoop binaries into
- {testdir}/hadoop
- """
- log.info(
- '_download_inktank_hadoop_bins: path {path} on host {host}'.format(
- path=hadoop_url, host=str(remote)))
- file_name = 'hadoop.tgz'
- testdir = teuthology.get_testdir(ctx)
- remote.run(
- args=[
- 'mkdir', '-p', '-m0755',
- '{tdir}/inktank_hadoop'.format(tdir=testdir),
- run.Raw('&&'),
- 'echo',
- '{file_name}'.format(file_name=file_name),
- run.Raw('|'),
- 'wget',
- '-nv',
- '-O-',
- '--base={url}'.format(url=hadoop_url),
- # need to use --input-file to make wget respect --base
- '--input-file=-',
- run.Raw('|'),
- 'tar', '-xzf', '-', '-C',
- '{tdir}/inktank_hadoop'.format(tdir=testdir),
- ],
- )
-
+ log.info("Stopping Hadoop daemons")
-def _copy_hadoop_cephfs_jars(ctx, remote, from_dir, to_dir):
- """
- copy hadoop-cephfs.jar and hadoop-cephfs-test.jar into apache_hadoop
- """
- testdir = teuthology.get_testdir(ctx)
- log.info('copy jars from {from_dir} to {to_dir} on host {host}'.format(
- from_dir=from_dir, to_dir=to_dir, host=str(remote)))
- file_names = ['hadoop-cephfs.jar', 'hadoop-cephfs-test.jar']
- for file_name in file_names:
- log.info('Copying file {file_name}'.format(file_name=file_name))
- remote.run(
- args=['cp', '{tdir}/{from_dir}/{file_name}'.format(
- tdir=testdir, from_dir=from_dir, file_name=file_name),
- '{tdir}/{to_dir}/'.format(tdir=testdir, to_dir=to_dir)
- ],
- )
-
-
-def _node_binaries(ctx, remote, inktank_hadoop_bindir_url,
- apache_hadoop_bindir_url):
- """
- Download and copy over the appropriate binaries and jar files.
- The calls from binaries() end up spawning this function on remote sites.
- """
- _download_inktank_hadoop_bins(ctx, remote, inktank_hadoop_bindir_url)
- _download_apache_hadoop_bins(ctx, remote, apache_hadoop_bindir_url)
- _copy_hadoop_cephfs_jars(ctx, remote, 'inktank_hadoop/build',
- 'apache_hadoop/build')
-
-
-@contextlib.contextmanager
-def binaries(ctx, config):
- """
- Fetch the binaries from the gitbuilder, and spawn the download tasks on
- the remote machines.
- """
- path = config.get('path')
-
- if path is None:
- # fetch Apache Hadoop from gitbuilder
- log.info(
- 'Fetching and unpacking Apache Hadoop binaries from gitbuilder...')
- apache_sha1, apache_hadoop_bindir_url = teuthology.get_ceph_binary_url(
- package='apache-hadoop',
- branch=config.get('apache_branch'),
- tag=config.get('tag'),
- sha1=config.get('sha1'),
- flavor=config.get('flavor'),
- format=config.get('format'),
- dist=config.get('dist'),
- arch=config.get('arch'),
+ master.run(
+ args = [
+ hadoop_dir + "sbin/stop-yarn.sh"
+ ],
+ wait = True,
)
- log.info('apache_hadoop_bindir_url %s' % (apache_hadoop_bindir_url))
- ctx.summary['apache-hadoop-sha1'] = apache_sha1
-
- # fetch Inktank Hadoop from gitbuilder
- log.info(
- 'Fetching and unpacking Inktank Hadoop binaries from gitbuilder...')
- inktank_sha1, inktank_hadoop_bindir_url = \
- teuthology.get_ceph_binary_url(
- package='hadoop',
- branch=config.get('inktank_branch'),
- tag=config.get('tag'),
- sha1=config.get('sha1'),
- flavor=config.get('flavor'),
- format=config.get('format'),
- dist=config.get('dist'),
- arch=config.get('arch'),
- )
- log.info('inktank_hadoop_bindir_url %s' % (inktank_hadoop_bindir_url))
- ctx.summary['inktank-hadoop-sha1'] = inktank_sha1
-
- else:
- raise Exception(
- "The hadoop task does not support the path argument at present")
-
- with parallel() as parallel_task:
- hadoop_nodes = ctx.cluster.only(teuthology.is_type('hadoop'))
- # these can happen independently
- for remote in hadoop_nodes.remotes.iterkeys():
- parallel_task.spawn(_node_binaries, ctx, remote,
- inktank_hadoop_bindir_url, apache_hadoop_bindir_url)
- try:
- yield
- finally:
- log.info('Removing hadoop binaries...')
- run.wait(
- ctx.cluster.run(
- args=['rm', '-rf', '--', '{tdir}/apache_hadoop'.format(
- tdir=teuthology.get_testdir(ctx))],
- wait=False,
- ),
+ master.run(
+ args = [
+ hadoop_dir + "sbin/stop-dfs.sh"
+ ],
+ wait = True,
)
+
run.wait(
ctx.cluster.run(
- args=['rm', '-rf', '--', '{tdir}/inktank_hadoop'.format(
- tdir=teuthology.get_testdir(ctx))],
- wait=False,
- ),
+ args = [
+ 'sudo',
+ 'skill',
+ '-9',
+ 'java'
+ ],
+ wait = False
+ )
)
-
-@contextlib.contextmanager
-def out_of_safemode(ctx, config):
- """
- A Hadoop NameNode will stay in safe mode for 30 seconds by default.
- This method blocks until the NameNode is out of safe mode.
- """
- if config.get('hdfs'):
- log.info('Waiting for the Namenode to exit safe mode...')
-
- master = _get_master(ctx)
- remote, _ = master
- remote.run(
- args=["{tdir}/apache_hadoop/bin/hadoop".format(
- tdir=teuthology.get_testdir(ctx)),
- "dfsadmin",
- "-safemode",
- "wait"],
- wait=True,
- )
- else:
- pass
-
- try:
- yield
- finally:
- pass
-
-
@contextlib.contextmanager
def task(ctx, config):
- """
- Set up and tear down a Hadoop cluster.
-
- This depends on either having ceph installed prior to hadoop, like so:
-
- roles:
- - [mon.0, mds.0, osd.0, hadoop.master.0]
- - [mon.1, osd.1, hadoop.slave.0]
- - [mon.2, hadoop.slave.1]
-
- tasks:
- - ceph:
- - hadoop:
-
- Or if you want to use HDFS under Hadoop, this will configure Hadoop
- for HDFS and start it along with MapReduce. Note that it does not
- require Ceph be installed.
-
- roles:
- - [hadoop.master.0]
- - [hadoop.slave.0]
- - [hadoop.slave.1]
-
- tasks:
- - hadoop:
- hdfs: True
-
- This task requires exactly one hadoop.master be specified
- and at least one hadoop.slave.
-
- This does *not* do anything with the Hadoop setup. To run wordcount,
- you could use pexec like so (after the hadoop task):
-
- - pexec:
- hadoop.slave.0:
- - mkdir -p /tmp/hadoop_input
- - wget http://ceph.com/qa/hadoop_input_files.tar -O /tmp/hadoop_input/files.tar
- - cd /tmp/hadoop_input/; tar -xf /tmp/hadoop_input/files.tar
- - {tdir}/hadoop/bin/hadoop fs -mkdir wordcount_input
- - {tdir}/hadoop/bin/hadoop fs -put /tmp/hadoop_input/*txt wordcount_input/
- - {tdir}/hadoop/bin/hadoop jar {tdir}/hadoop/build/hadoop-example*jar wordcount wordcount_input wordcount_output
- - rm -rf /tmp/hadoop_input
-
- Note: {tdir} in the above example is the teuthology test directory.
- """
-
if config is None:
config = {}
- assert isinstance(config, dict), \
- "task hadoop only supports a dictionary for configuration"
-
- dist = 'precise'
- format_type = 'jar'
- arch = 'x86_64'
- flavor = config.get('flavor', 'basic')
-
- ctx.summary['flavor'] = flavor
+ assert isinstance(config, dict), "task hadoop config must be dictionary"
overrides = ctx.config.get('overrides', {})
teuthology.deep_merge(config, overrides.get('hadoop', {}))
- apache_branch = None
- if config.get('apache_hadoop_branch') is not None:
- apache_branch = config.get('apache_hadoop_branch')
- else:
- apache_branch = 'branch-1.0' # hadoop branch to acquire
-
- inktank_branch = None
- if config.get('inktank_hadoop_branch') is not None:
- inktank_branch = config.get('inktank_hadoop_branch')
- else:
- inktank_branch = 'cephfs/branch-1.0' # default branch name
-
- # replace any '/' with a '_' to match the artifact paths
- inktank_branch = inktank_branch.replace('/', '_')
- apache_branch = apache_branch.replace('/', '_')
-
- with contextutil.nested(
- lambda: validate_cluster(ctx=ctx),
- lambda: binaries(ctx=ctx, config=dict(
- tag=config.get('tag'),
- sha1=config.get('sha1'),
- path=config.get('path'),
- flavor=flavor,
- dist=config.get('dist', dist),
- format=format_type,
- arch=arch,
- apache_branch=apache_branch,
- inktank_branch=inktank_branch,
- )),
- lambda: configure_hadoop(ctx=ctx, config=config),
+ tasks = [
+ lambda: install_hadoop(ctx=ctx, config=config),
lambda: start_hadoop(ctx=ctx, config=config),
- lambda: out_of_safemode(ctx=ctx, config=config),
- ):
+ ]
+
+ with contextutil.nested(*tasks):
yield