from time import gmtime, strftime
import boto3
from botocore.client import Config
+import os
+import subprocess
log = logging.getLogger('bucket_notification.tests')
parameters = {'notification': notification}
return self.send_request('DELETE', parameters)
+
+
+test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../'
+
+def bash(cmd, **kwargs):
+ log.debug('running command: %s', ' '.join(cmd))
+ kwargs['stdout'] = subprocess.PIPE
+ process = subprocess.Popen(cmd, **kwargs)
+ s = process.communicate()[0].decode('utf-8')
+ return (s, process.returncode)
+
+def admin(args, **kwargs):
+ """ radosgw-admin command """
+ cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
+ return bash(cmd, **kwargs)
+
from random import randint
import hashlib
from nose.plugins.attrib import attr
+import boto3
+import datetime
from boto.s3.connection import S3Connection
PSNotificationS3, \
delete_all_s3_topics, \
delete_all_objects, \
- put_object_tagging
+ put_object_tagging, \
+ admin
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal, assert_in
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
-
+
try:
+ s3_notification_conf = None
+ topic_conf1 = None
+ topic_conf2 = None
+ receiver = None
task, receiver = create_kafka_receiver_thread(topic_name+'_1')
task.start()
print('wait for 5sec for the messages...')
time.sleep(5)
receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
-
+ except Exception as e:
+ print(e)
+ assert False
finally:
# cleanup
- s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
+ if s3_notification_conf is not None:
+ s3_notification_conf.del_config()
+ if topic_conf1 is not None:
+ topic_conf1.del_config()
+ if topic_conf2 is not None:
+ topic_conf2.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- stop_kafka_receiver(receiver, task)
+ if receiver is not None:
+ stop_kafka_receiver(receiver, task)
@attr('http_test')
conn.delete_bucket(bucket_name)
http_server.close()
+@attr('http_test')
+def test_ps_s3_lifecycle_on_master():
+ """ test that when object is deleted due to lifecycle policy, notification is sent on master """
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectLifecycle:Expiration:*']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ obj_prefix = 'ooo'
+ client_threads = []
+ start_time = time.time()
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(obj_prefix + str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # create lifecycle policy
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+ yesterday = datetime.date.today() - datetime.timedelta(days=1)
+ response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
+ LifecycleConfiguration={'Rules': [
+ {
+ 'ID': 'rule1',
+ 'Expiration': {'Date': yesterday.isoformat()},
+ 'Filter': {'Prefix': obj_prefix},
+ 'Status': 'Enabled',
+ }
+ ]
+ }
+ )
+
+ # start lifecycle processing
+ admin(['lc', 'process'])
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver does not have messages
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ event_keys = []
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
+ event_keys.append(event['Records'][0]['s3']['object']['key'])
+ for key in keys:
+ key_found = False
+ for event_key in event_keys:
+ if event_key == key:
+ key_found = True
+ break
+ if not key_found:
+ err = 'no lifecycle event found for key: ' + str(key)
+ log.error(events)
+ assert False, err
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""