]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
Adding a Hadoop task.
authorJoe Buck <jbbuck@gmail.com>
Thu, 6 Dec 2012 22:19:55 +0000 (14:19 -0800)
committerJoe Buck <jbbuck@gmail.com>
Tue, 11 Dec 2012 22:07:28 +0000 (14:07 -0800)
This task configures and starts a Hadoop cluster.
It does not run any jobs, that must be done after
this task runs.
Can run on either Ceph or HDFS.

Signed-off-by: Joe Buck <jbbuck@gmail.com>
teuthology/task/hadoop.py [new file with mode: 0644]

diff --git a/teuthology/task/hadoop.py b/teuthology/task/hadoop.py
new file mode 100644 (file)
index 0000000..38b88e4
--- /dev/null
@@ -0,0 +1,432 @@
+from cStringIO import StringIO
+
+import contextlib
+import logging
+import os
+import re
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.parallel import parallel
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+###################
+# This installeds and configures Hadoop, but requires that Ceph is already installed and running.
+##################
+
+## Check that there is exactly one master and at least one slave configured
+@contextlib.contextmanager
+def validate_config(ctx, config):
+    log.info('Vaidating Hadoop configuration')
+    slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
+
+    if (len(slaves.remotes) < 1):
+        raise Exception("At least one hadoop.slave must be specified")
+    else:
+        log.info(str(len(slaves.remotes)) + " slaves specified")
+
+    masters = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    if (len(masters.remotes) == 1):
+        pass
+    else:
+        raise Exception("Exactly one hadoop.master must be specified. Currently there are " + str(len(masters.remotes)))
+
+    try: 
+        yield
+
+    finally:
+        pass
+
+## Add required entries to conf/hadoop-env.sh
+def write_hadoop_env(ctx, config):
+    hadoopEnvFile = "/tmp/cephtest/hadoop/conf/hadoop-env.sh"
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+        teuthology.write_file(remote, hadoopEnvFile, 
+'''export JAVA_HOME=/usr/lib/jvm/default-java
+export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp/cephtest/binary/usr/local/lib:/usr/lib
+export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/tmp/cephtest/binary/usr/local/lib/libcephfs.jar:/tmp/cephtest/hadoop/build/hadoop-core*.jar
+export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
+export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS"
+export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS"
+export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS"
+export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS"
+'''     )
+        log.info("wrote file: " + hadoopEnvFile + " to host: " + str(remote))
+
+## Add required entries to conf/core-site.xml
+def write_core_site(ctx, config):
+    coreSiteFile = "/tmp/cephtest/hadoop/conf/core-site.xml" 
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+
+        # check the config to see if we should use hdfs or ceph
+        default_fs_string = ""
+        if config.get('hdfs'):
+            default_fs_string = 'hdfs://{master_ip}:54310'.format(master_ip=get_hadoop_master_ip(ctx))
+        else:
+            default_fs_string = 'ceph:///'
+
+        teuthology.write_file(remote, coreSiteFile, 
+'''<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!-- Put site-specific property overrides in this file.  -->
+<configuration>
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>/tmp/hadoop/tmp</value>
+    </property>
+    <property>
+        <name>fs.default.name</name>
+        <value>{default_fs}</value>
+    </property>
+    <property>
+        <name>ceph.conf.file</name>
+        <value>/tmp/cephtest/ceph.conf</value>
+    </property>
+</configuration>
+'''.format(default_fs=default_fs_string))
+
+        log.info("wrote file: " + coreSiteFile + " to host: " + str(remote))
+
+## finds the hadoop.master in the ctx and then pulls out just the IP address
+def get_hadoop_master_ip(ctx):
+    remote, _ = _get_master(ctx)
+    master_name,master_port= remote.ssh.get_transport().getpeername()
+    log.info('master name: {name} port {port}'.format(name=master_name,port=master_port))
+    return master_name
+
+## Add required entries to conf/mapred-site.xml
+def write_mapred_site(ctx):
+    mapredSiteFile = "/tmp/cephtest/hadoop/conf/mapred-site.xml"
+
+    master_ip = get_hadoop_master_ip(ctx)
+    log.info('adding host {remote} as jobtracker'.format(remote=master_ip))
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+        teuthology.write_file(remote, mapredSiteFile, 
+'''<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!-- Put site-specific property overrides in this file. -->
+<configuration>
+    <property>
+        <name>mapred.job.tracker</name>
+        <value>{remote}:54311</value>
+    </property>
+</configuration>  
+'''.format(remote=master_ip))
+
+        log.info("wrote file: " + mapredSiteFile + " to host: " + str(remote))
+
+## Add required entries to conf/hdfs-site.xml
+def write_hdfs_site(ctx):
+    hdfsSiteFile = "/tmp/cephtest/hadoop/conf/hdfs-site.xml"
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+        teuthology.write_file(remote, hdfsSiteFile, 
+'''<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!-- Put site-specific property overrides in this file. -->
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+    </property>
+</configuration>
+'''     )
+        log.info("wrote file: " + hdfsSiteFile + " to host: " + str(remote))
+
+## Add required entries to conf/slaves 
+## These nodes host TaskTrackers and DataNodes
+def write_slaves(ctx):
+    log.info('Setting up slave nodes...')
+
+    slavesFile = "/tmp/cephtest/hadoop/conf/slaves"
+    tmpFile = StringIO()
+
+    slaves = ctx.cluster.only(teuthology.is_type('hadoop.slave'))
+    for remote, roles_for_host in slaves.remotes.iteritems():
+        tmpFile.write('{remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+
+    tmpFile.seek(0)
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+        teuthology.write_file(remote=remote, path=slavesFile, data=tmpFile)
+        tmpFile.seek(0)
+        log.info("wrote file: " + slavesFile + " to host: " + str(remote))
+
+## Add required entries to conf/masters 
+## These nodes host JobTrackers and Namenodes
+def write_master(ctx):
+    mastersFile = "/tmp/cephtest/hadoop/conf/masters"
+    master = _get_master(ctx)
+    remote, _ = master
+
+
+    hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+    for remote, roles_for_host in hadoopNodes.remotes.iteritems():
+        teuthology.write_file(remote, mastersFile, '{remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+        log.info("wrote file: " + mastersFile + " to host: " + str(remote))
+
+## Call the various functions that configure Hadoop
+def _configure_hadoop(ctx, config):
+    log.info('writing out config files')
+
+    write_hadoop_env(ctx, config)
+    write_core_site(ctx, config)
+    write_mapred_site(ctx)
+    write_hdfs_site(ctx)
+    write_slaves(ctx)
+    write_master(ctx)
+
+
+
+@contextlib.contextmanager
+def configure_hadoop(ctx, config):
+    _configure_hadoop(ctx,config)
+
+    log.info('config.get(hdfs): {hdfs}'.format(hdfs=config.get('hdfs')))
+
+    if config.get('hdfs'):
+        log.info('hdfs option specified. Setting up hdfs')
+
+        # let's run this from the master
+        master = _get_master(ctx)
+        remote, _ = master
+        remote.run(
+        args=["/tmp/cephtest/hadoop/bin/hadoop","namenode","-format"],
+            wait=True,
+        )
+
+    log.info('done setting up hadoop')
+
+    try: 
+        yield
+
+    finally:
+        log.info('Removing hdfs directory')
+        run.wait(
+            ctx.cluster.run(
+                args=[
+                    'rm',
+                    '-rf',
+                    '/tmp/hadoop',
+                    ],
+                wait=False,
+                ),
+            )
+
+def _start_hadoop(remote, config):
+    if config.get('hdfs'):
+        remote.run(
+            args=['/tmp/cephtest/hadoop/bin/start-dfs.sh', ],
+            wait=True,
+        )
+        log.info('done starting hdfs')
+
+    remote.run(
+        args=['/tmp/cephtest/hadoop/bin/start-mapred.sh', ], 
+        wait=True,
+    )
+    log.info('done starting mapred')
+
+
+def _stop_hadoop(remote, config):
+    remote.run(
+        args=['/tmp/cephtest/hadoop/bin/stop-mapred.sh', ],
+        wait=True,
+    )
+
+    if config.get('hdfs'):
+        remote.run(
+            args=['/tmp/cephtest/hadoop/bin/stop-dfs.sh', ],
+            wait=True,
+        )
+
+    log.info('done stopping hadoop')
+
+def _get_master(ctx):
+    master = ctx.cluster.only(teuthology.is_type('hadoop.master'))
+    assert 1 == len(master.remotes.items()), 'There must be exactly 1 hadoop.master configured'
+
+    return master.remotes.items()[0]
+
+@contextlib.contextmanager
+def start_hadoop(ctx, config):
+    master = _get_master(ctx)
+    remote, _ = master
+
+    log.info('Starting hadoop on {remote}\n'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+    _start_hadoop(remote, config)
+
+    try: 
+        yield
+
+    finally:
+        log.info('Running stop-mapred.sh on {remote}'.format(remote=remote.ssh.get_transport().getpeername()[0]))
+        _stop_hadoop(remote, config)
+
+# download and untar the most recent hadoop binaries into /tmp/cephtest/hadoop
+def _download_hadoop_binaries(remote, hadoop_url):
+    log.info('_download_hadoop_binaries: path %s' % hadoop_url)
+    fileName = 'hadoop.tgz'
+    remote.run(
+        args=[
+            'mkdir', '-p', '-m0755', '/tmp/cephtest/hadoop',
+            run.Raw('&&'),
+            'echo',
+            '{fileName}'.format(fileName=fileName),
+            run.Raw('|'),
+            'wget',
+            '-nv',
+            '-O-',
+            '--base={url}'.format(url=hadoop_url),
+            # need to use --input-file to make wget respect --base
+            '--input-file=-',
+            run.Raw('|'),
+            'tar', '-xzf', '-', '-C', '/tmp/cephtest/hadoop',
+        ],
+    )
+
+@contextlib.contextmanager
+def binaries(ctx, config):
+    path = config.get('path')
+    tmpdir = None
+
+    if path is None:
+        # fetch from gitbuilder gitbuilder
+        log.info('Fetching and unpacking hadoop binaries from gitbuilder...')
+        sha1, hadoop_bindir_url = teuthology.get_ceph_binary_url(
+            package='hadoop',
+            branch=config.get('branch'),
+            tag=config.get('tag'),
+            sha1=config.get('sha1'),
+            flavor=config.get('flavor'),
+            format=config.get('format'),
+            dist=config.get('dist'),
+            arch=config.get('arch'),
+            )
+        log.info('hadoop_bindir_url %s' % (hadoop_bindir_url))
+        ctx.summary['ceph-sha1'] = sha1
+        if ctx.archive is not None:
+            with file(os.path.join(ctx.archive, 'ceph-sha1'), 'w') as f:
+                f.write(sha1 + '\n')
+
+    with parallel() as p:
+        hadoopNodes = ctx.cluster.only(teuthology.is_type('hadoop'))
+        for remote in hadoopNodes.remotes.iterkeys():
+            p.spawn(_download_hadoop_binaries, remote, hadoop_bindir_url)
+
+    try:
+        yield
+    finally:
+        log.info('Removing hadoop binaries...')
+        run.wait(
+            ctx.cluster.run(
+                args=[ 'rm', '-rf', '--', '/tmp/cephtest/hadoop'],
+                wait=False,
+                ),
+            )
+
+## A Hadoop NameNode will stay in safe mode for 30 seconds by default.
+## This method blocks until the NameNode is out of safe mode.
+@contextlib.contextmanager
+def out_of_safemode(ctx, config):
+
+    if config.get('hdfs'):
+        log.info('Waiting for the Namenode to exit safe mode...')
+
+        master = _get_master(ctx)
+        remote, _ = master
+        remote.run(
+            args=["/tmp/cephtest/hadoop/bin/hadoop","dfsadmin","-safemode", "wait"],
+            wait=True,
+        )
+    else:
+        pass
+
+    try:
+        yield
+    finally:
+        pass
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Set up and tear down a Hadoop cluster.
+
+    This depends on either having ceph installed prior to hadoop, like so:
+
+    roles:
+    - [mon.0, mds.0, osd.0, hadoop.master.0]
+    - [mon.1, osd.1, hadoop.slave.0]
+    - [mon.2, hadoop.slave.1]
+
+    tasks:
+    - ceph:
+    - hadoop:
+
+    Or if you want to use HDFS under Hadoop, this will configure Hadoop
+    for HDFS and start it along with MapReduce. Note that it does not 
+    require Ceph be installed.
+
+    roles:
+    - [hadoop.master.0]
+    - [hadoop.slave.0]
+    - [hadoop.slave.1]
+
+    tasks:
+    - hadoop:
+        hdfs: True
+
+    This task requires exactly one hadoop.master be specified 
+    and at least one hadoop.slave.
+
+    This does *not* do anything with the Hadoop setup. To run wordcount, 
+    you could use pexec like so (after the hadoop task):
+
+    - pexec: 
+        hadoop.slave.0:
+          - mkdir -p /tmp/hadoop_input
+          - wget http://ceph.com/qa/hadoop_input_files.tar -O /tmp/hadoop_input/files.tar
+          - cd /tmp/hadoop_input/; tar -xf /tmp/hadoop_input/files.tar
+          - /tmp/cephtest/hadoop/bin/hadoop fs -mkdir wordcount_input 
+          - /tmp/cephtest/hadoop/bin/hadoop fs -put /tmp/hadoop_input/*txt wordcount_input/ 
+          - /tmp/cephtest/hadoop/bin/hadoop jar /tmp/cephtest/hadoop/build/hadoop-example*jar wordcount wordcount_input wordcount_output  
+          - rm -rf /tmp/hadoop_input
+    """
+    dist = 'precise'
+    format = 'jar'
+    arch = 'x86_64'
+    flavor = 'basic'
+    branch = 'cephfs_branch-1.0' # hadoop branch to acquire
+
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        "task hadoop only supports a dictionary for configuration"
+
+    with contextutil.nested(
+        lambda: validate_config(ctx=ctx, config=config),
+        lambda: binaries(ctx=ctx, config=dict(
+                branch=branch,
+                tag=config.get('tag'),
+                sha1=config.get('sha1'),
+                path=config.get('path'),
+                flavor=flavor,
+                dist=config.get('dist', dist),
+                format=format,
+                arch=arch
+                )),
+        lambda: configure_hadoop(ctx=ctx, config=config),
+        lambda: start_hadoop(ctx=ctx, config=config),
+        lambda: out_of_safemode(ctx=ctx, config=config),
+        ):
+        yield