]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: add integration tests for lc triggered bucket notifications
authoryuval Lifshitz <ylifshit@redhat.com>
Thu, 30 Dec 2021 16:14:03 +0000 (18:14 +0200)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 4 Jan 2022 20:40:27 +0000 (15:40 -0500)
Signed-off-by: yuval Lifshitz <ylifshit@redhat.com>
src/mrun
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/test_bn.py

index b3a25d14db3056a2cffbaa6bf9203bc0109ff870..ba6f8cece41fc274f5fd0c79d7b32c7b4837ec29 100755 (executable)
--- a/src/mrun
+++ b/src/mrun
@@ -6,17 +6,18 @@ root=`dirname $0`
 run_name=$1
 command=$2
 CEPH_BIN=$root
-CEPH_CONF_PATH=$root/run/$run_name
+
+[[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$root || CEPH_CONF_PATH=$root/run/$run_name
 
 [ -z "$BUILD_DIR" ] && BUILD_DIR=build
 
 if [ -e CMakeCache.txt ]; then
     CEPH_BIN=$PWD/bin
-    CEPH_CONF_PATH=$PWD/run/$run_name
+    [[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name
 elif [ -e $root/../${BUILD_DIR}/CMakeCache.txt ]; then
     cd $root/../${BUILD_DIR}
     CEPH_BIN=$PWD/bin
-    CEPH_CONF_PATH=$PWD/run/$run_name
+    [[ "$run_name" == "noname" ]] && CEPH_CONF_PATH=$PWD || CEPH_CONF_PATH=$PWD/run/$run_name
 fi
 
 shift 2
index 168bf29df3a96ffb30d341e2a8a6d5751a24f653..2e0fc9ca1df68dda1013d07db99e6004a2407757 100644 (file)
@@ -10,6 +10,8 @@ from urllib import parse as urlparse
 from time import gmtime, strftime
 import boto3
 from botocore.client import Config
+import os
+import subprocess
 
 log = logging.getLogger('bucket_notification.tests')
 
@@ -231,3 +233,19 @@ class PSNotificationS3:
         parameters = {'notification': notification}
 
         return self.send_request('DELETE', parameters)
+
+
+test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../'
+
+def bash(cmd, **kwargs):
+    log.debug('running command: %s', ' '.join(cmd))
+    kwargs['stdout'] = subprocess.PIPE
+    process = subprocess.Popen(cmd, **kwargs)
+    s = process.communicate()[0].decode('utf-8')
+    return (s, process.returncode)
+
+def admin(args, **kwargs):
+    """ radosgw-admin command """
+    cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
+    return bash(cmd, **kwargs)
+
index 8c7e85c957e0a33ca5bf14f230580bda2227ab33..12113295dc8a7cdb1f4db3054634c760479978a7 100644 (file)
@@ -13,6 +13,8 @@ from http import server as http_server
 from random import randint
 import hashlib
 from nose.plugins.attrib import attr
+import boto3
+import datetime
 
 from boto.s3.connection import S3Connection
 
@@ -27,7 +29,8 @@ from .api import PSTopicS3, \
     PSNotificationS3, \
     delete_all_s3_topics, \
     delete_all_objects, \
-    put_object_tagging
+    put_object_tagging, \
+    admin
 
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal, assert_in
@@ -1259,8 +1262,12 @@ def test_ps_s3_notification_push_kafka_on_master():
     # name is constant for manual testing
     topic_name = bucket_name+'_topic'
     # create consumer on the topic
-    
+
     try:
+        s3_notification_conf = None
+        topic_conf1 = None
+        topic_conf2 = None
+        receiver = None
         task, receiver = create_kafka_receiver_thread(topic_name+'_1')
         task.start()
 
@@ -1324,15 +1331,21 @@ def test_ps_s3_notification_push_kafka_on_master():
         print('wait for 5sec for the messages...')
         time.sleep(5)
         receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
-
+    except Exception as e:
+        print(e)
+        assert False
     finally:
         # cleanup
-        s3_notification_conf.del_config()
-        topic_conf1.del_config()
-        topic_conf2.del_config()
+        if s3_notification_conf is not None:
+            s3_notification_conf.del_config()
+        if topic_conf1 is not None:
+            topic_conf1.del_config()
+        if topic_conf2 is not None:
+            topic_conf2.del_config()
         # delete the bucket
         conn.delete_bucket(bucket_name)
-        stop_kafka_receiver(receiver, task)
+        if receiver is not None:
+            stop_kafka_receiver(receiver, task)
 
 
 @attr('http_test')
@@ -1556,6 +1569,109 @@ def test_ps_s3_opaque_data_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+@attr('http_test')
+def test_ps_s3_lifecycle_on_master():
+    """ test that when object is deleted due to lifecycle policy, notification is sent on master """
+    hostname = get_ip()
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectLifecycle:Expiration:*']
+                       }]
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    obj_prefix = 'ooo'
+    client_threads = []
+    start_time = time.time()
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(obj_prefix + str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    
+    # create lifecycle policy
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key)
+    yesterday = datetime.date.today() - datetime.timedelta(days=1)
+    response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, 
+            LifecycleConfiguration={'Rules': [
+                {
+                    'ID': 'rule1',
+                    'Expiration': {'Date': yesterday.isoformat()},
+                    'Filter': {'Prefix': obj_prefix},
+                    'Status': 'Enabled',
+                }
+            ]
+        }
+    )
+
+    # start lifecycle processing
+    admin(['lc', 'process'])
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check http receiver does not have messages
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    event_keys = []
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
+        event_keys.append(event['Records'][0]['s3']['object']['key'])
+    for key in keys:
+        key_found = False
+        for event_key in event_keys:
+            if event_key == key:
+                key_found = True
+                break
+        if not key_found:
+            err = 'no lifecycle event found for key: ' + str(key)
+            log.error(events)
+            assert False, err
+
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads]
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""