From 06eb0df290042bea6e866c4edeea82bfa79833d0 Mon Sep 17 00:00:00 2001 From: yuval Lifshitz Date: Thu, 30 Dec 2021 18:14:03 +0200 Subject: [PATCH] rgwlc: add integration tests for lc triggered bucket notifications Signed-off-by: yuval Lifshitz --- src/mrun | 7 +- src/test/rgw/bucket_notification/api.py | 18 +++ src/test/rgw/bucket_notification/test_bn.py | 130 ++++++++++++++++++-- 3 files changed, 145 insertions(+), 10 deletions(-) diff --git a/src/mrun b/src/mrun index b3a25d14db3..ba6f8cece41 100755 --- a/src/mrun +++ b/src/mrun @@ -6,17 +6,18 @@ root=`dirname $0` run_name=$1 command=$2 CEPH_BIN=$root -CEPH_CONF_PATH=$root/run/$run_name + +[[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$root || CEPH_CONF_PATH=$root/run/$run_name [ -z "$BUILD_DIR" ] && BUILD_DIR=build if [ -e CMakeCache.txt ]; then CEPH_BIN=$PWD/bin - CEPH_CONF_PATH=$PWD/run/$run_name + [[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name elif [ -e $root/../${BUILD_DIR}/CMakeCache.txt ]; then cd $root/../${BUILD_DIR} CEPH_BIN=$PWD/bin - CEPH_CONF_PATH=$PWD/run/$run_name + [[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name fi shift 2 diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index 168bf29df3a..2e0fc9ca1df 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -10,6 +10,8 @@ from urllib import parse as urlparse from time import gmtime, strftime import boto3 from botocore.client import Config +import os +import subprocess log = logging.getLogger('bucket_notification.tests') @@ -231,3 +233,19 @@ class PSNotificationS3: parameters = {'notification': notification} return self.send_request('DELETE', parameters) + + +test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../' + +def bash(cmd, **kwargs): + log.debug('running command: %s', ' '.join(cmd)) + kwargs['stdout'] = subprocess.PIPE + process = subprocess.Popen(cmd, **kwargs) + s = process.communicate()[0].decode('utf-8') + return (s, process.returncode) + +def admin(args, **kwargs): + """ radosgw-admin command """ + cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args + return bash(cmd, **kwargs) + diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 8c7e85c957e..12113295dc8 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -13,6 +13,8 @@ from http import server as http_server from random import randint import hashlib from nose.plugins.attrib import attr +import boto3 +import datetime from boto.s3.connection import S3Connection @@ -27,7 +29,8 @@ from .api import PSTopicS3, \ PSNotificationS3, \ delete_all_s3_topics, \ delete_all_objects, \ - put_object_tagging + put_object_tagging, \ + admin from nose import SkipTest from nose.tools import assert_not_equal, assert_equal, assert_in @@ -1259,8 +1262,12 @@ def test_ps_s3_notification_push_kafka_on_master(): # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic - + try: + s3_notification_conf = None + topic_conf1 = None + topic_conf2 = None + receiver = None task, receiver = create_kafka_receiver_thread(topic_name+'_1') task.start() @@ -1324,15 +1331,21 @@ def test_ps_s3_notification_push_kafka_on_master(): print('wait for 5sec for the messages...') time.sleep(5) receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) - + except Exception as e: + print(e) + assert False finally: # cleanup - s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() + if s3_notification_conf is not None: + s3_notification_conf.del_config() + if topic_conf1 is not None: + topic_conf1.del_config() + if topic_conf2 is not None: + topic_conf2.del_config() # delete the bucket conn.delete_bucket(bucket_name) - stop_kafka_receiver(receiver, task) + if receiver is not None: + stop_kafka_receiver(receiver, task) @attr('http_test') @@ -1556,6 +1569,109 @@ def test_ps_s3_opaque_data_on_master(): conn.delete_bucket(bucket_name) http_server.close() +@attr('http_test') +def test_ps_s3_lifecycle_on_master(): + """ test that when object is deleted due to lifecycle policy, notification is sent on master """ + hostname = get_ip() + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + opaque_data = 'http://1.2.3.4:8888' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectLifecycle:Expiration:*'] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + obj_prefix = 'ooo' + client_threads = [] + start_time = time.time() + content = 'bar' + for i in range(number_of_objects): + key = bucket.new_key(obj_prefix + str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # create lifecycle policy + client = boto3.client('s3', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key) + yesterday = datetime.date.today() - datetime.timedelta(days=1) + response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, + LifecycleConfiguration={'Rules': [ + { + 'ID': 'rule1', + 'Expiration': {'Date': yesterday.isoformat()}, + 'Filter': {'Prefix': obj_prefix}, + 'Status': 'Enabled', + } + ] + } + ) + + # start lifecycle processing + admin(['lc', 'process']) + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver does not have messages + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + event_keys = [] + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current') + event_keys.append(event['Records'][0]['s3']['object']['key']) + for key in keys: + key_found = False + for event_key in event_keys: + if event_key == key: + key_found = True + break + if not key_found: + err = 'no lifecycle event found for key: ' + str(key) + log.error(events) + assert False, err + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" -- 2.39.5