]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: Kafka task files for bucket notification tests
authorroot <root@localhost.localdomain>
Mon, 28 Dec 2020 12:41:15 +0000 (18:11 +0530)
committerroot <root@localhost.localdomain>
Wed, 3 Mar 2021 16:34:04 +0000 (22:04 +0530)
This commit consists of 3 things:
1. Files required for setting up new directory (in order to run the task in teuthology)
2. Kafka task file
3. The new files conataining tests and it's infrastructure for separation of bucket notification tests from pubsub tests

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
22 files changed:
qa/suites/rgw/notifications/% [new file with mode: 0644]
qa/suites/rgw/notifications/centos_latest.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/clusters/fixed-2.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/frontend/civetweb.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/objectstore/bluestore-bitmap.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/objectstore/filestore-xfs.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/overrides.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/rgw_pool_type/ec-profile.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/rgw_pool_type/ec.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/rgw_pool_type/replicated.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/+ [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/0-install.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/test_kafka.yaml [new file with mode: 0644]
qa/tasks/kafka.py [new file with mode: 0644]
qa/tasks/notification_tests.py [new file with mode: 0644]
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/__init__.py
src/test/rgw/bucket_notification/bntests.conf.SAMPLE [new file with mode: 0644]
src/test/rgw/bucket_notification/bootstrap [new file with mode: 0755]
src/test/rgw/bucket_notification/requirements.txt [new file with mode: 0644]
src/test/rgw/bucket_notification/setup.py [new file with mode: 0644]
src/test/rgw/bucket_notification/test_bn.py

diff --git a/qa/suites/rgw/notifications/% b/qa/suites/rgw/notifications/%
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/rgw/notifications/centos_latest.yaml b/qa/suites/rgw/notifications/centos_latest.yaml
new file mode 100644 (file)
index 0000000..24ae800
--- /dev/null
@@ -0,0 +1,6 @@
+os_type: centos
+os_version: "8.1"
+overrides:
+  selinux:
+    whitelist:
+      - scontext=system_u:system_r:logrotate_t:s0
diff --git a/qa/suites/rgw/notifications/clusters/fixed-2.yaml b/qa/suites/rgw/notifications/clusters/fixed-2.yaml
new file mode 100644 (file)
index 0000000..e4448bb
--- /dev/null
@@ -0,0 +1,12 @@
+roles:
+- [mon.a, mon.c, mgr.y, osd.0, osd.1, osd.2, osd.3, client.0, node-exporter.a]
+- [mon.b, mgr.x, osd.4, osd.5, osd.6, osd.7, client.1, prometheus.a, node-exporter.b]
+openstack:
+- volumes: # attached to each instance
+    count: 4
+    size: 10 # GB
+overrides:
+  ceph:
+    conf:
+      osd:
+        osd shutdown pgref assert: true
diff --git a/qa/suites/rgw/notifications/frontend/civetweb.yaml b/qa/suites/rgw/notifications/frontend/civetweb.yaml
new file mode 100644 (file)
index 0000000..57e8cc4
--- /dev/null
@@ -0,0 +1,3 @@
+overrides:
+  rgw:
+    frontend: civetweb
diff --git a/qa/suites/rgw/notifications/objectstore/bluestore-bitmap.yaml b/qa/suites/rgw/notifications/objectstore/bluestore-bitmap.yaml
new file mode 100644 (file)
index 0000000..69c37ac
--- /dev/null
@@ -0,0 +1,42 @@
+overrides:
+  thrashosds:
+    bdev_inject_crash: 2
+    bdev_inject_crash_probability: .5
+  ceph:
+    fs: xfs
+    conf:
+      osd:
+        osd objectstore: bluestore
+        bluestore block size: 96636764160
+        debug bluestore: 20
+        debug bluefs: 20
+        debug rocksdb: 10
+        bluestore fsck on mount: true
+        bluestore allocator: bitmap
+        # lower the full ratios since we can fill up a 100gb osd so quickly
+        mon osd full ratio: .9
+        mon osd backfillfull_ratio: .85
+        mon osd nearfull ratio: .8
+        osd failsafe full ratio: .95
+# this doesn't work with failures bc the log writes are not atomic across the two backends
+#        bluestore bluefs env mirror: true
+        bdev enable discard: true
+        bdev async discard: true
+  ceph-deploy:
+    fs: xfs
+    bluestore: yes
+    conf:
+      osd:
+        osd objectstore: bluestore
+        bluestore block size: 96636764160
+        debug bluestore: 20
+        debug bluefs: 20
+        debug rocksdb: 10
+        bluestore fsck on mount: true
+        # lower the full ratios since we can fill up a 100gb osd so quickly
+        mon osd full ratio: .9
+        mon osd backfillfull_ratio: .85
+        mon osd nearfull ratio: .8
+        osd failsafe full ratio: .95
+        bdev enable discard: true
+        bdev async discard: true
diff --git a/qa/suites/rgw/notifications/objectstore/filestore-xfs.yaml b/qa/suites/rgw/notifications/objectstore/filestore-xfs.yaml
new file mode 100644 (file)
index 0000000..bd18aca
--- /dev/null
@@ -0,0 +1,14 @@
+overrides:
+  ceph:
+    fs: xfs
+    conf:
+      osd:
+        osd objectstore: filestore
+        osd sloppy crc: true
+  ceph-deploy:
+    fs: xfs
+    filestore: True
+    conf:
+      osd:
+        osd objectstore: filestore
+        osd sloppy crc: true
diff --git a/qa/suites/rgw/notifications/overrides.yaml b/qa/suites/rgw/notifications/overrides.yaml
new file mode 100644 (file)
index 0000000..1cb4890
--- /dev/null
@@ -0,0 +1,13 @@
+overrides:
+  ceph:
+    wait-for-scrub: false
+    conf:
+      client:
+        setuser: ceph
+        setgroup: ceph
+        debug rgw: 20
+        rgw crypt s3 kms backend: testing
+        rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo=
+        rgw crypt require ssl: false
+  rgw:
+    storage classes: LUKEWARM, FROZEN
diff --git a/qa/suites/rgw/notifications/rgw_pool_type/ec-profile.yaml b/qa/suites/rgw/notifications/rgw_pool_type/ec-profile.yaml
new file mode 100644 (file)
index 0000000..f6fbf35
--- /dev/null
@@ -0,0 +1,8 @@
+overrides:
+  rgw:
+    ec-data-pool: true
+    erasure_code_profile:
+      name: testprofile
+      k: 3
+      m: 1
+      crush-failure-domain: osd
diff --git a/qa/suites/rgw/notifications/rgw_pool_type/ec.yaml b/qa/suites/rgw/notifications/rgw_pool_type/ec.yaml
new file mode 100644 (file)
index 0000000..7c0c5e6
--- /dev/null
@@ -0,0 +1,3 @@
+overrides:
+  rgw:
+    ec-data-pool: true
diff --git a/qa/suites/rgw/notifications/rgw_pool_type/replicated.yaml b/qa/suites/rgw/notifications/rgw_pool_type/replicated.yaml
new file mode 100644 (file)
index 0000000..c91709e
--- /dev/null
@@ -0,0 +1,3 @@
+overrides:
+  rgw:
+    ec-data-pool: false
diff --git a/qa/suites/rgw/notifications/tasks/+ b/qa/suites/rgw/notifications/tasks/+
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/rgw/notifications/tasks/0-install.yaml b/qa/suites/rgw/notifications/tasks/0-install.yaml
new file mode 100644 (file)
index 0000000..5ebe672
--- /dev/null
@@ -0,0 +1,20 @@
+# see http://tracker.ceph.com/issues/20360 and http://tracker.ceph.com/issues/18126
+os_type: centos
+
+tasks:
+- install:
+#    flavor: notcmalloc
+- ceph:
+- openssl_keys:
+- rgw:
+    client.0:
+#      valgrind: [--tool=memcheck, --max-threads=1024] # http://tracker.ceph.com/issues/25214
+
+overrides:
+  ceph:
+    conf:
+      global:
+        osd_min_pg_log_entries: 10
+        osd_max_pg_log_entries: 10
+      client:
+        rgw lc debug interval: 10
diff --git a/qa/suites/rgw/notifications/tasks/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/test_kafka.yaml
new file mode 100644 (file)
index 0000000..1fb7c37
--- /dev/null
@@ -0,0 +1,9 @@
+tasks:
+- tox: [ client.0 ]
+- kafka:
+    client.0:
+- notification-tests:
+    client.0:
+      force-branch: wip-rgw-bucket-tests-separation-new
+      rgw_server: client.0
+      git_remote: https://github.com/TRYTOBE8TME/
diff --git a/qa/tasks/kafka.py b/qa/tasks/kafka.py
new file mode 100644 (file)
index 0000000..d549f34
--- /dev/null
@@ -0,0 +1,203 @@
+"""
+Deploy and configure Kafka for Teuthology
+"""
+from io import BytesIO
+from io import StringIO
+from configobj import ConfigObj
+import base64
+import argparse
+import contextlib
+import logging
+import os
+import random
+import six
+import string
+import subprocess
+import json
+import sys
+from pathlib import Path
+
+from collections import OrderedDict
+from itertools import chain
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.config import config as teuth_config
+from teuthology.orchestra import run
+from teuthology.packaging import install_package
+from teuthology.packaging import remove_package
+from teuthology.exceptions import ConfigError
+
+log = logging.getLogger(__name__)
+
+def get_kafka_dir(ctx):
+    return '{tdir}/kafka-2.6.0-src'.format(tdir=teuthology.get_testdir(ctx))
+
+def run_in_kafka_dir(ctx, client, args, **kwargs):
+    return ctx.cluster.only(client).run(
+        args=[ 'cd', get_kafka_dir(ctx), run.Raw('&&'), ] + args,
+        **kwargs
+    )
+
+def get_toxvenv_dir(ctx):
+    return ctx.tox.venv_path
+
+def toxvenv_sh(ctx, remote, args, **kwargs):
+    activate = get_toxvenv_dir(ctx) + '/bin/activate'
+    return remote.sh(['source', activate, run.Raw('&&')] + args, **kwargs)
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+    """
+    Downloading the kafka tar file.
+    """
+    assert isinstance(config, dict)
+    log.info('Installing Kafka...')
+
+    for (client, _) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+        test_dir=teuthology.get_testdir(ctx)
+        toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', 'https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz'])
+        toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', 'kafka-2.6.0-src.tgz'])
+
+    try:
+        yield
+    finally:
+        log.info('Removing packaged dependencies of Kafka...')
+        test_dir=get_kafka_dir(ctx)
+        for client in config:
+            ctx.cluster.only(client).run(
+                args=['rm', '-rf', test_dir],
+            )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+    """
+    This includes two parts:
+    1. Starting Zookeeper service
+    2. Starting Kafka service
+    """
+    assert isinstance(config, dict)
+    log.info('Bringing up Zookeeper and Kafka services...')
+    for (client,_) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        toxvenv_sh(ctx, remote,
+            ['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+             './gradlew', 'jar', 
+             '-PscalaVersion=2.13.2'
+            ],
+        )
+
+        toxvenv_sh(ctx, remote,
+            ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+             './zookeeper-server-start.sh',
+             '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx)),
+             run.Raw('&'), 'exit'
+            ],
+        )
+
+        toxvenv_sh(ctx, remote,
+            ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+             './kafka-server-start.sh',
+             '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx)),
+             run.Raw('&'), 'exit'
+            ],
+        )
+
+    try:
+        yield
+    finally:
+        log.info('Stopping Zookeeper and Kafka Services...')
+
+        for (client, _) in config.items():
+            (remote,) = ctx.cluster.only(client).remotes.keys()
+
+            toxvenv_sh(ctx, remote, 
+                ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+                 './kafka-server-stop.sh',  
+                 '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx)),
+                ]
+            )
+
+            toxvenv_sh(ctx, remote, 
+                ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'), 
+                 './zookeeper-server-stop.sh',
+                 '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx)),
+                ]
+            )
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+    """
+    Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+    """
+    assert isinstance(config, dict)
+    log.info('Checking kafka server through producer/consumer commands...')
+    for (client,_) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        toxvenv_sh(ctx, remote,
+            [
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'), 
+                './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+                '--bootstrap-server', 'localhost:9092'
+            ])
+
+        toxvenv_sh(ctx, remote,
+            [
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+                'echo', "First", run.Raw('|'),
+                './kafka-console-producer.sh', '--topic', 'quickstart-events',
+                '--bootstrap-server', 'localhost:9092'
+            ])
+
+        toxvenv_sh(ctx, remote,
+            [
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+                './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+                '--from-beginning',
+                '--bootstrap-server', 'localhost:9092',
+                run.Raw('&'), 'exit'
+            ])
+
+    try:
+        yield
+    finally:
+        pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+    """
+    To run kafka the prerequisite is to run the tox task. Following is the way how to run
+    tox and then kafka::
+    tasks:
+    - tox: [ client.0 ]
+    - kafka:
+        client.0:
+    """
+    assert config is None or isinstance(config, list) \
+        or isinstance(config, dict), \
+        "task kafka only supports a list or dictionary for configuration"
+
+    if not hasattr(ctx, 'tox'):
+        raise ConfigError('kafka must run after the tox task')
+
+    all_clients = ['client.{id}'.format(id=id_)
+                   for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+    if config is None:
+        config = all_clients
+    if isinstance(config, list):
+        config = dict.fromkeys(config)
+    clients=config.keys()
+
+    log.debug('Kafka config is %s', config)
+
+    with contextutil.nested(
+        lambda: install_kafka(ctx=ctx, config=config),
+        lambda: run_kafka(ctx=ctx, config=config),
+        lambda: run_admin_cmds(ctx=ctx, config=config),
+        ):
+        yield
diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py
new file mode 100644 (file)
index 0000000..774e42c
--- /dev/null
@@ -0,0 +1,260 @@
+"""
+Run a set of bucket notification tests on rgw.
+"""
+from io import BytesIO
+from configobj import ConfigObj
+import base64
+import contextlib
+import logging
+import os
+import random
+import string
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.config import config as teuth_config
+from teuthology.orchestra import run
+from teuthology.exceptions import ConfigError
+
+log = logging.getLogger(__name__)
+
+@contextlib.contextmanager
+def download(ctx, config):
+    """
+    Download the bucket notification tests from the git builder.
+    Remove downloaded test file upon exit.
+    The context passed in should be identical to the context
+    passed in to the main task.
+    """
+    assert isinstance(config, dict)
+    log.info('Downloading bucket-notification-tests...')
+    testdir = teuthology.get_testdir(ctx)
+    for (client, client_config) in config.items():
+        bntests_branch = client_config.get('force-branch', None)
+        if not bntests_branch:
+            raise ValueError(
+                "Could not determine what branch to use for bn-tests. Please add 'force-branch: {bn-tests branch name}' to the .yaml config for this bucket notifications tests task.")
+
+        log.info("Using branch '%s' for bucket notifications tests", bntests_branch)
+        sha1 = client_config.get('sha1')
+        git_remote = client_config.get('git_remote', teuth_config.ceph_git_base_url)
+        ctx.cluster.only(client).run(
+            args=[
+                'git', 'clone',
+                '-b', bntests_branch,
+                git_remote + 'ceph.git',
+                '{tdir}/ceph'.format(tdir=testdir),
+                ],
+            )
+        if sha1 is not None:
+            ctx.cluster.only(client).run(
+                args=[
+                    'cd', '{tdir}/ceph'.format(tdir=testdir),
+                    run.Raw('&&'),
+                    'git', 'reset', '--hard', sha1,
+                    ],
+                )
+    try:
+        yield
+    finally:
+        log.info('Removing bucket-notifications-tests...')
+        testdir = teuthology.get_testdir(ctx)
+        for client in config:
+            ctx.cluster.only(client).run(
+                args=[
+                    'rm',
+                    '-rf',
+                    '{tdir}/ceph'.format(tdir=testdir),
+                    ],
+                )
+
+def _config_user(bntests_conf, section, user):
+    """
+    Configure users for this section by stashing away keys, ids, and
+    email addresses.
+    """
+    bntests_conf[section].setdefault('user_id', user)
+    bntests_conf[section].setdefault('email', '{user}+test@test.test'.format(user=user))
+    bntests_conf[section].setdefault('display_name', 'Mr. {user}'.format(user=user))
+    bntests_conf[section].setdefault('access_key',
+        ''.join(random.choice(string.ascii_uppercase) for i in range(20)))
+    bntests_conf[section].setdefault('secret_key',
+        base64.b64encode(os.urandom(40)).decode())
+
+@contextlib.contextmanager
+def create_users(ctx, config):
+    """
+    Create a main and an alternate s3 user.
+    """
+    assert isinstance(config, dict)
+    log.info('Creating rgw user...')
+    testdir = teuthology.get_testdir(ctx)
+
+    users = {'s3 main': 'foo'}
+    for client in config['clients']:
+        bntests_conf = config['bntests_conf'][client]
+        for section, user in users.items():
+            _config_user(bntests_conf, section, '{user}.{client}'.format(user=user, client=client))
+            log.debug('Creating user {user} on {host}'.format(user=bntests_conf[section]['user_id'], host=client))
+            cluster_name, daemon_type, client_id = teuthology.split_role(client)
+            client_with_id = daemon_type + '.' + client_id
+            ctx.cluster.only(client).run(
+                args=[
+                    'adjust-ulimits',
+                    'ceph-coverage',
+                    '{tdir}/archive/coverage'.format(tdir=testdir),
+                    'radosgw-admin',
+                    '-n', client_with_id,
+                    'user', 'create',
+                    '--uid', bntests_conf[section]['user_id'],
+                    '--display-name', bntests_conf[section]['display_name'],
+                    '--access-key', bntests_conf[section]['access_key'],
+                    '--secret', bntests_conf[section]['secret_key'],
+                    '--cluster', cluster_name,
+                    ],
+                )
+
+    try:
+        yield
+    finally:
+        for client in config['clients']:
+            for user in users.values():
+                uid = '{user}.{client}'.format(user=user, client=client)
+                cluster_name, daemon_type, client_id = teuthology.split_role(client)
+                client_with_id = daemon_type + '.' + client_id
+                ctx.cluster.only(client).run(
+                    args=[
+                        'adjust-ulimits',
+                        'ceph-coverage',
+                        '{tdir}/archive/coverage'.format(tdir=testdir),
+                        'radosgw-admin',
+                        '-n', client_with_id,
+                        'user', 'rm',
+                        '--uid', uid,
+                        '--purge-data',
+                        '--cluster', cluster_name,
+                        ],
+                    )
+
+@contextlib.contextmanager
+def configure(ctx, config):
+    assert isinstance(config, dict)
+    log.info('Configuring bucket-notifications-tests...')
+    testdir = teuthology.get_testdir(ctx)
+    for client, properties in config['clients'].items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+        bntests_conf = config['bntests_conf'][client]
+
+        conf_fp = BytesIO()
+        bntests_conf.write(conf_fp)
+        remote.write_file(
+            path='{tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
+            data=conf_fp.getvalue(),
+            )
+
+        remote.run(
+            args=[
+                'cd',
+                '{tdir}/ceph/src/test/rgw/bucket_notification'.format(tdir=testdir),
+                run.Raw('&&'),
+                './bootstrap',
+                ],
+            )
+
+    try:
+        yield
+    finally:
+        log.info('Removing bn-tests.conf file...')
+        testdir = teuthology.get_testdir(ctx)
+        for client, properties in config['clients'].items():
+            (remote,) = ctx.cluster.only(client).remotes.keys()
+            remote.run(
+                 args=['rm', '-f',
+                       '{tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir,client=client),
+                 ],
+                 )
+
+@contextlib.contextmanager
+def run_tests(ctx, config):
+    """
+    Run the bucket notifications tests after everything is set up.
+    :param ctx: Context passed to task
+    :param config: specific configuration information
+    """
+    assert isinstance(config, dict)
+    log.info('Running bucket-notifications-tests...')
+    testdir = teuthology.get_testdir(ctx)
+    for client, client_config in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        args = [
+            'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
+            '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
+            '-m', 'nose',
+            '-s',
+            '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir)
+            ]
+
+        remote.run(
+            args=args,
+            label="bucket notification tests against kafka server"
+            )
+    yield
+
+@contextlib.contextmanager
+def task(ctx,config):
+    """
+    To run bucket notification tests the prerequisite is to run the kafka and tox task. Following is the way how to run
+    tox and then kafka and finally bucket notification tests::
+    tasks:
+    - tox: [ client.0 ]
+    - kafka:
+        client.0:
+    - notification_tests:
+        client.0:
+    """
+    assert config is None or isinstance(config, list) \
+        or isinstance(config, dict), \
+        "task kafka only supports a list or dictionary for configuration"
+
+    all_clients = ['client.{id}'.format(id=id_)
+                   for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+    if config is None:
+        config = all_clients
+    if isinstance(config, list):
+        config = dict.fromkeys(config)
+    clients=config.keys()
+
+    log.debug('Notifications config is %s', config)
+
+    bntests_conf = {}
+
+    for client in clients:
+        endpoint = ctx.rgw.role_endpoints.get(client)
+        assert endpoint, 'bntests: no rgw endpoint for {}'.format(client)
+
+        bntests_conf[client] = ConfigObj(
+            indent_type='',
+            infile={
+                'DEFAULT':
+                    {
+                    'port':endpoint.port,
+                    'host':endpoint.dns_name,
+                    },
+                's3 main':{}
+            }
+        )
+
+    with contextutil.nested(
+        lambda: download(ctx=ctx, config=config),
+        lambda: create_users(ctx=ctx, config=dict(
+                clients=clients,
+                bntests_conf=bntests_conf,
+                )),
+        lambda: configure(ctx=ctx, config=dict(
+                clients=config,
+                bntests_conf=bntests_conf,
+                )),
+        lambda: run_tests(ctx=ctx, config=config),
+        ):
+        yield
index a3973f6ee7e8454826e628e25db22a179a6c0563..e58dea4e0d13559947808c820240fb8549bb3e48 100644 (file)
@@ -1 +1,47 @@
-TO BE FILLED
+============================
+ Bucket Notification tests
+============================
+
+You will need to use the sample configuration file named ``bntests.conf.SAMPLE`` 
+that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are
+running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started 
+with vstart.
+
+You also need to install Kafka which can be done by downloading and unzipping from the following::
+
+        https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz
+
+Then inside the kafka config directory (``/path/to/kafka-2.6.0-src/config/``) you need to create a file named ``kafka_server_jaas.conf``
+with the following content::
+
+        KafkaClient {
+          org.apache.kafka.common.security.plain.PlainLoginModule required
+          username="alice"
+          password="alice-secret";
+        };
+
+After creating this above file run the following command in kafka directory (``/path/to/kafka-2.6.0-src/``)::
+
+        ./gradlew jar -PscalaVersion=2.13.2
+
+After following the above steps next is you need to start the Zookeeper and Kafka services.
+Here's the commands which can be used to start these services. For starting 
+Zookeeper service run::
+
+        bin/zookeeper-server-start.sh config/zookeeper.properties
+
+and then run to start the Kafka service::
+
+        bin/kafka-server-start.sh config/server.properties
+
+If you want to run Zookeeper and Kafka services in background add ``-daemon`` at the end of the command like::
+
+        bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
+
+and::
+
+        bin/kafka-server-start.sh -daemon config/server.properties
+
+After starting vstart, zookeeper and kafka services you're ready to run the tests::
+
+        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..6785fce9263447f7aa49040f461a855396a02cdf 100644 (file)
@@ -0,0 +1,48 @@
+import configparser
+import os
+
+def setup():
+    cfg = configparser.RawConfigParser()
+    try:
+        path = os.environ['BNTESTS_CONF']
+    except KeyError:
+        raise RuntimeError(
+            'To run tests, point environment '
+            + 'variable BNTESTS_CONF to a config file.',
+            )
+    cfg.read(path)
+
+    if not cfg.defaults():
+        raise RuntimeError('Your config file is missing the DEFAULT section!')
+    if not cfg.has_section("s3 main"):
+        raise RuntimeError('Your config file is missing the "s3 main" section!')
+
+    defaults = cfg.defaults()
+
+    global default_host
+    default_host = defaults.get("host")
+
+    global default_port
+    default_port = int(defaults.get("port"))
+
+    global main_access_key
+    main_access_key = cfg.get('s3 main',"access_key")
+    
+    global main_secret_key
+    main_secret_key = cfg.get('s3 main',"secret_key")
+
+def get_config_host():
+    global default_host
+    return default_host
+
+def get_config_port():
+    global default_port
+    return default_port
+
+def get_access_key():
+    global main_access_key
+    return main_access_key
+
+def get_secret_key():
+    global main_secret_key
+    return main_secret_key
diff --git a/src/test/rgw/bucket_notification/bntests.conf.SAMPLE b/src/test/rgw/bucket_notification/bntests.conf.SAMPLE
new file mode 100644 (file)
index 0000000..eb3291d
--- /dev/null
@@ -0,0 +1,10 @@
+[DEFAULT]
+port = 8000
+host = localhost
+
+[s3 main]
+access_key = 0555b35654ad1656d804
+secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
+display_name = M. Tester
+user_id = testid
+email = tester@ceph.com
diff --git a/src/test/rgw/bucket_notification/bootstrap b/src/test/rgw/bucket_notification/bootstrap
new file mode 100755 (executable)
index 0000000..c833622
--- /dev/null
@@ -0,0 +1,45 @@
+#!/bin/sh
+set -e
+
+if [ -f /etc/debian_version ]; then
+    for package in python3-pip python3-virtualenv python3-dev python3-xmltodict python3-pika libevent-dev libxml2-dev libxslt-dev zlib1g-dev; do
+        if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+            # add a space after old values
+            missing="${missing:+$missing }$package"
+        fi
+    done
+    if [ -n "$missing" ]; then
+        echo "$0: missing required DEB packages. Installing via sudo." 1>&2
+        sudo apt-get -y install $missing
+    fi
+fi
+if [ -f /etc/redhat-release ]; then
+    for package in python3-pip python3-virtualenv python3-devel python3-xmltodict python3-pika libevent-devel libxml2-devel libxslt-devel zlib-devel; do
+        if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+            missing="${missing:+$missing }$package"
+        fi
+    done
+    if [ -n "$missing" ]; then
+        echo "$0: missing required RPM packages. Installing via sudo." 1>&2
+        sudo yum -y install $missing
+    fi
+fi
+
+virtualenv -p python3 --system-site-packages --distribute virtualenv
+
+# avoid pip bugs
+./virtualenv/bin/pip install --upgrade pip
+#pip3 install --upgrade setuptools cffi  # address pip issue: https://github.com/pypa/pip/issues/6264
+
+# work-around change in pip 1.5
+#./virtualenv/bin/pip install six
+#./virtualenv/bin/pip install -I nose
+#./virtualenv/bin/pip install setuptools
+
+./virtualenv/bin/pip install -U -r requirements.txt
+
+# forbid setuptools from using the network because it'll try to use
+# easy_install, and we really wanted pip; next line will fail if pip
+# requirements.txt does not match setup.py requirements -- sucky but
+# good enough for now
+./virtualenv/bin/python setup.py develop
diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt
new file mode 100644 (file)
index 0000000..0027741
--- /dev/null
@@ -0,0 +1,5 @@
+nose >=1.0.0
+boto >=2.6.0
+boto3 >=1.0.0
+configparser >=5.0.0
+kafka-python >=2.0.0
diff --git a/src/test/rgw/bucket_notification/setup.py b/src/test/rgw/bucket_notification/setup.py
new file mode 100644 (file)
index 0000000..189ab27
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/python
+from setuptools import setup, find_packages
+
+setup(
+    name='bn_tests',
+    version='0.0.1',
+    packages=find_packages(),
+
+    author='Kalpesh Pandya',
+    author_email='kapandya@redhat.com',
+    description='Bucket Notification compatibility tests',
+    license='MIT',
+    keywords='bn web testing',
+
+    install_requires=[
+        'boto >=2.0b4',
+        'boto3 >=1.0.0'
+        ],
+    )
index 9e67e1a34d59ffb6ab2fadf73f950a14c6b894f7..9b253b51de7a726c5bf4c211771e3c7c2c1c819d 100644 (file)
@@ -13,6 +13,13 @@ from random import randint
 
 from boto.s3.connection import S3Connection
 
+from . import(
+    get_config_host,
+    get_config_port,
+    get_access_key,
+    get_secret_key
+    )
+
 from .api import PSTopicS3, \
     PSNotificationS3, \
     delete_all_s3_topics, \
@@ -402,14 +409,25 @@ def stop_kafka_receiver(receiver, task):
 def get_ip():
     return 'localhost'
 
+def get_ip_http():
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    try:
+        # address should not be reachable
+        s.connect(('10.255.255.255', 1))
+        ip = s.getsockname()[0]
+    finally:
+        s.close()
+    return ip
+
 def connection():
-    vstart_access_key = '0555b35654ad1656d804'
-    vstart_secret_key = 'h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=='
-    hostname = get_ip()
+    hostname = get_config_host()
+    port_no = get_config_port()
+    vstart_access_key = get_access_key()
+    vstart_secret_key = get_secret_key()
 
     conn = S3Connection(aws_access_key_id=vstart_access_key,
                      aws_secret_access_key=vstart_secret_key,
-                      is_secure=False, port=8000, host=hostname, 
+                      is_secure=False, port=port_no, host=hostname, 
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
 
     return conn
@@ -431,9 +449,11 @@ def connection2():
 # bucket notifications tests
 ##############
 
-'''
+
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
+    return SkipTest('Get tenant function required.')
+
     zonegroup = 'default' 
     bucket_name = gen_bucket_name()
     conn = connection()
@@ -497,6 +517,8 @@ def test_ps_s3_topic_on_master():
 
 def test_ps_s3_topic_with_secret_on_master():
     """ test s3 topics with secret set/get/delete on master """
+    return SkipTest('secure connection is needed to test topic with secrets')
+
     conn = connection1()
     if conn.secure_conn is None:
         return SkipTest('secure connection is needed to test topic with secrets')
@@ -541,7 +563,6 @@ def test_ps_s3_topic_with_secret_on_master():
 
     # delete topics
     result = topic_conf.del_config()
-'''
 
 
 def test_ps_s3_notification_on_master():
@@ -603,13 +624,15 @@ def test_ps_s3_notification_on_master():
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
+
 def test_ps_s3_notification_filter_on_master():
     """ test s3 notification filter on master """
-    on_master = True
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
-    if on_master:
-        conn = connection()
-        ps_zone = conn
+    
+    conn = connection()
+    ps_zone = conn
 
     zonegroup = 'default'
 
@@ -626,9 +649,9 @@ def test_ps_s3_notification_filter_on_master():
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    if on_master:
-        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
-        topic_arn = topic_conf.set_config()
+        
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
 
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -665,29 +688,28 @@ def test_ps_s3_notification_filter_on_master():
     result, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
-    if on_master:
-        topic_conf_list = [{'Id': notification_name+'_4',
-                            'TopicArn': topic_arn,
-                            'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
-                            'Filter': {
-                                'Metadata': {
-                                    'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
-                                                    {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
-                                },
-                                'Key': {
-                                    'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
-                                }
+    topic_conf_list = [{'Id': notification_name+'_4',
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+                        'Filter': {
+                            'Metadata': {
+                                'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
+                                                {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
+                            },
+                            'Key': {
+                                'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
                             }
-                            }]
+                        }
+                        }]
 
-        try:
-            s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
-            _, status = s3_notification_conf4.set_config()
-            assert_equal(status/100, 2)
-            skip_notif4 = False
-        except Exception as error:
-            print('note: metadata filter is not supported by boto3 - skipping test')
-            skip_notif4 = True
+    try:
+        s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
+        _, status = s3_notification_conf4.set_config()
+        assert_equal(status/100, 2)
+        skip_notif4 = False
+    except Exception as error:
+        print('note: metadata filter is not supported by boto3 - skipping test')
+        skip_notif4 = True
 
 
     # get all notifications
@@ -736,9 +758,8 @@ def test_ps_s3_notification_filter_on_master():
         key = bucket.new_key(key_name)
         key.set_contents_from_string('bar')
 
-    if on_master:
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
 
     found_in1 = []
     found_in2 = []
@@ -873,6 +894,8 @@ def test_ps_s3_notification_errors_on_master():
 
 def test_ps_s3_notification_push_amqp_on_master():
     """ test pushing amqp s3 notification on master """
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1121,7 +1144,7 @@ def test_ps_s3_notification_multi_delete_on_master():
 
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
-    hostname = get_ip()
+    hostname = get_ip_http()
     conn = connection()
     zonegroup = 'default'
 
@@ -1271,6 +1294,8 @@ def test_ps_s3_opaque_data_on_master():
 
 def test_ps_s3_creation_triggers_on_master():
     """ test object creation s3 notifications in using put/copy/post on master"""
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1336,6 +1361,8 @@ def test_ps_s3_creation_triggers_on_master():
 
 def test_ps_s3_multipart_on_master():
     """ test multipart object upload on master"""
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1424,6 +1451,8 @@ def test_ps_s3_multipart_on_master():
 
 def test_ps_s3_metadata_on_master():
     """ test s3 notification of metadata on master """
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1521,6 +1550,8 @@ def test_ps_s3_metadata_on_master():
 
 def test_ps_s3_tags_on_master():
     """ test s3 notification of tags on master """
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1594,6 +1625,8 @@ def test_ps_s3_tags_on_master():
 
 def test_ps_s3_versioning_on_master():
     """ test s3 notification of object versions """
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1661,6 +1694,8 @@ def test_ps_s3_versioning_on_master():
 
 def test_ps_s3_versioned_deletion_on_master():
     """ test s3 notification of deletion markers on master """
+    return SkipTest('This is an AMQP test.')
+
     hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
@@ -1927,6 +1962,8 @@ def test_ps_s3_persistent_notification_pushback():
 
 def test_ps_s3_persistent_gateways_recovery():
     """ test gateway recovery of persistent notifications """
+    return SkipTest('This test requires two gateways.')
+
     conn = connection()
     zonegroup = 'default'
     # create random port for the http server
@@ -2010,6 +2047,8 @@ def test_ps_s3_persistent_gateways_recovery():
 
 def test_ps_s3_persistent_multiple_gateways():
     """ test pushing persistent notification via two gateways """
+    return SkipTest('This test requires two gateways.')
+
     conn = connection()
     zonegroup = 'default'
     # create random port for the http server
@@ -2212,6 +2251,7 @@ def persistent_notification(endpoint_type):
     host = get_ip()
     if endpoint_type == 'http':
         # create random port for the http server
+        host = get_ip_http()
         port = random.randint(10000, 20000)
         # start an http server in a separate thread
         receiver = StreamingHTTPServer(host, port, num_workers=10)
@@ -2303,8 +2343,15 @@ def test_ps_s3_persistent_notification_http():
 
 def test_ps_s3_persistent_notification_amqp():
     """ test pushing persistent notification amqp """
+    return SkipTest('This is an AMQP test.')
     persistent_notification('amqp')
 
+'''
+def test_ps_s3_persistent_notification_kafka():
+    """ test pushing persistent notification http """
+    persistent_notification('kafka')
+'''
+
 def random_string(length):
     import string
     letters = string.ascii_letters
@@ -2312,6 +2359,8 @@ def random_string(length):
 
 def test_ps_s3_persistent_notification_large():
     """ test pushing persistent notification of large notifications """
+    return SkipTest('This is an AMQP test.')
+
     conn = connection()
     zonegroup = 'default'
 
@@ -2393,13 +2442,12 @@ def test_ps_s3_persistent_notification_large():
     conn.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
 
-'''
-#################
-# Extra tests
-################
+
 
 def test_ps_s3_topic_update():
     """ test updating topic associated with a notification"""
+    return SkipTest('This test is yet to be modified.')
+
     conn = connection()
     ps_zone = None
     bucket_name = gen_bucket_name()
@@ -2502,8 +2550,12 @@ def test_ps_s3_topic_update():
     topic_conf.del_config()
     conn.delete_bucket(bucket_name)
     http_server.close()
+
+
 def test_ps_s3_notification_update():
     """ test updating the topic of a notification"""
+    return SkipTest('This test is yet to be modified.')
+
     hostname = get_ip()
     conn = connection()
     ps_zone = None
@@ -2586,8 +2638,12 @@ def test_ps_s3_notification_update():
     topic_conf2.del_config()
     conn.delete_bucket(bucket_name)
     http_server.close()
+
+
 def test_ps_s3_multiple_topics_notification():
     """ test notification creation with multiple topics"""
+    return SkipTest('This test is yet to be modified.')
+
     hostname = get_ip()
     zonegroup = 'default'
     conn = connection()
@@ -2684,8 +2740,12 @@ def test_ps_s3_multiple_topics_notification():
         key.delete()
     conn.delete_bucket(bucket_name)
     http_server.close()
+
+
 def kafka_security(security_type):
     """ test pushing kafka s3 notification on master """
+    return SkipTest('This test is yet to be modified.')
+
     conn = connection()
     if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
         return SkipTest("secure connection is needed to test SASL_SSL security")
@@ -2761,8 +2821,13 @@ def kafka_security(security_type):
             key.delete()
         conn.delete_bucket(bucket_name)
         stop_kafka_receiver(receiver, task)
+
+
 def test_ps_s3_notification_push_kafka_security_ssl():
+    return SkipTest('This test is yet to be modified.')
     kafka_security('SSL')
+
 def test_ps_s3_notification_push_kafka_security_ssl_sasl():
+    return SkipTest('This test is yet to be modified.')
     kafka_security('SSL_SASL')
-'''
+