Notifications
~~~~~~~~~~~~~
-Create a Notification
-`````````````````````
-
-This will create a publisher for a specific bucket into a topic.
-
-::
-
- PUT /<bucket name>?notification
-
-Request parameters are encoded in XML in the body of the request, with the following format:
-
-::
-
- <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
- <TopicConfiguration>
- <Id></Id>
- <Topic></Topic>
- <Event></Event>
- </TopicConfiguration>
- </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-Delete Notification
-```````````````````
-
-Delete a specific, or all, notifications from a bucket.
-
-::
-
- DELETE /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all notifications on the bucket are deleted)
+Detailed under: `Bucket Operations`_.
.. note::
- - Notification deletion is an extension to the S3 notification API
- - When the bucket is deleted, any notification defined on it is also deleted
-
-Get/List Notifications
-``````````````````````
-
-Get a specific notification, or list all notifications defined on a bucket.
-
-::
-
- GET /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all notifications on the bucket are listed)
-
-Response is XML formatted:
-
-::
-
- <NotificationConfiguration>
- <TopicConfiguration>
- <Id></Id>
- <Topic></Topic>
- <Event></Event>
- </TopicConfiguration>
- </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: for list of supported events see: `S3 Notification Compatibility`_ (to support multiple types, any combination of wildcard events and ``Event`` tags may be used)
-
-
-.. note::
-
- - Getting information on a specific notification is an extension to the S3 notification API
- - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
+ - "Abort Multipart Upload" request does not emit a notification
+ - "Delete Multiple Objects" request does not emit a notification
+ - Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
Events
.. _PubSub Module : ../pubsub-module
.. _S3 Notification Compatibility: ../s3-notification-compatibility
.. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
+.. _Bucket Operations: ../s3/bucketops
S3-Compliant Notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~
-Create a Notification
-`````````````````````
-
-This will create a publisher for a specific bucket into a topic, and a subscription
-for pushing/pulling events.
-The subscription's name will have the same as the notification Id, and could be used later to fetch
-and ack events with the subscription API.
-
-::
-
- PUT /<bucket name>?notification
-
-Request parameters are encoded in XML in the body of the request, with the following format:
-
-::
-
- <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
- <TopicConfiguration>
- <Id></Id>
- <Topic></Topic>
- <Event></Event>
- </TopicConfiguration>
- </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-Delete Notification
-```````````````````
-
-Delete a specific, or all, S3-compliant notifications from a bucket. Associated subscription will also be deleted.
-
-::
-
- DELETE /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are deleted)
+Detailed under: `Bucket Operations`_.
.. note::
- - Notification deletion is an extension to the S3 notification API
- - When the bucket is deleted, any notification defined on it is also deleted.
- In this case, the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
+ - Notification creation will also create a subscription for pushing/pulling events
+ - The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API.
+ - Notification deletion will deletes all generated subscriptions
+ - In case that bucket deletion implicitly deletes the notification,
+ the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
and will have to be deleted explicitly with the subscription deletion API
-Get/List Notifications
-``````````````````````
-
-Get a specific S3-compliant notification, or list all S3-compliant notifications defined on a bucket.
-
-::
-
- GET /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are listed)
-
-Response is XML formatted:
-
-::
-
- <NotificationConfiguration>
- <TopicConfiguration>
- <Id></Id>
- <Topic></Topic>
- <Event></Event>
- </TopicConfiguration>
- </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-
-.. note::
-
- - Getting information on a specific notification is an extension to the S3 notification API
- - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
Non S3-Compliant Notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Request parameters:
- topic-name: name of topic
-- event: event type (string), one of: OBJECT_CREATE, OBJECT_DELETE
+- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
Delete Notification Information
```````````````````````````````
]}
- id: unique ID of the event, that could be used for acking
-- event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE``
+- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
- timestamp: timestamp indicating when the event was sent
- info.attrs.mtime: timestamp indicating when the event was triggered
- info.bucket.bucket_id: id of the bucket
.. _Multisite : ../multisite
.. _Bucket Notification : ../notifications
+.. _Bucket Operations: ../s3/bucketops
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectCreated:CompleteMultipartUpload`` | Not Supported | Supported at ``s3:ObjectCreated:*`` level |
-| (an extension to AWS) | | |
+| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
+----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:*`` | Supported |
+| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below |
+----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:Delete`` | Supported | supported at ``s3:ObjectRemoved:*`` level |
+| ``s3:ObjectRemoved:Delete`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported at ``s3:ObjectRemoved:*`` level |
+| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
+----------------------------------------------+-----------------+-------------------------------------------+
| ``Years`` | Integer | The number of years specified for the default retention period. | No |
+-----------------------------+-------------+----------------------------------------------------------------------------------------+----------+
+Create Notification
+-------------------
+
+Create a publisher for a specific bucket into a topic.
+
+Syntax
+~~~~~~
+
+::
+
+ PUT /<bucket name>?notification HTTP/1.1
+
+
+Request Entities
+~~~~~~~~~~~~~~~~
+
+Parameters are XML encoded in the body of the request, in the following format:
+
+::
+
+ <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <TopicConfiguration>
+ <Id></Id>
+ <Topic></Topic>
+ <Event></Event>
+ </TopicConfiguration>
+ </NotificationConfiguration>
+
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| Name | Type | Description | Required |
++===============================+===========+======================================================================================+==========+
+| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Id`` | String | Name of the notification | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Topic`` | String | Topic ARN. Topic must be created beforehand | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No |
+| | | entities can be used. If omitted, all events are handled | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status | Status Code | Description |
++===============+=======================+==========================================================+
+| ``400`` | MalformedXML | The XML is not well-formed |
++---------------+-----------------------+----------------------------------------------------------+
+| ``400`` | InvalidArgument | Missing Id; Missing/Invalid Topic ARN; Invalid Event |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404`` | NoSuchBucket | The bucket does not exist |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404`` | NoSuchKey | The topic does not exist |
++---------------+-----------------------+----------------------------------------------------------+
+
+
+Delete Notification
+-------------------
+
+Delete a specific, or all, notifications from a bucket.
+
+.. note::
+
+ - Notification deletion is an extension to the S3 notification API
+ - When the bucket is deleted, any notification defined on it is also deleted
+ - Deleting an unkown notification (e.g. double delete) is not considered an error
+
+Syntax
+~~~~~~
+
+::
+
+ DELETE /bucket?notification[=<notification-id>] HTTP/1.1
+
+
+Parameters
+~~~~~~~~~~
+
++------------------------+-----------+----------------------------------------------------------------------------------------+
+| Name | Type | Description |
++========================+===========+========================================================================================+
+| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are deleted |
++------------------------+-----------+----------------------------------------------------------------------------------------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status | Status Code | Description |
++===============+=======================+==========================================================+
+| ``404`` | NoSuchBucket | The bucket does not exist |
++---------------+-----------------------+----------------------------------------------------------+
+
+Get/List Notification
+---------------------
+
+Get a specific notification, or list all notifications configured on a bucket.
+
+Syntax
+~~~~~~
+
+::
+
+ GET /bucket?notification[=<notification-id>] HTTP/1.1
+
+
+Parameters
+~~~~~~~~~~
+
++------------------------+-----------+----------------------------------------------------------------------------------------+
+| Name | Type | Description |
++========================+===========+========================================================================================+
+| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are listed |
++------------------------+-----------+----------------------------------------------------------------------------------------+
+
+Response Entities
+~~~~~~~~~~~~~~~~~
+
+Response is XML encoded in the body of the request, in the following format:
+
+::
+
+ <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <TopicConfiguration>
+ <Id></Id>
+ <Topic></Topic>
+ <Event></Event>
+ </TopicConfiguration>
+ </NotificationConfiguration>
+
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| Name | Type | Description | Required |
++===============================+===========+======================================================================================+==========+
+| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Id`` | String | Name of the notification | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Topic`` | String | Topic ARN | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Event`` | String | Handled event. Multiple ``Event`` entities may exist | Yes |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status | Status Code | Description |
++===============+=======================+==========================================================+
+| ``404`` | NoSuchBucket | The bucket does not exist |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404`` | NoSuchKey | The notification does not exist (if provided) |
++---------------+-----------------------+----------------------------------------------------------+
+
+.. _S3 Notification Compatibility: ../s3-notification-compatibility
continue;
}
event_should_be_handled = true;
+ record.configurationId = topic_filter.s3_id;
+ ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id <<
+ "' on topic: '" << topic_cfg.dest.arn_topic <<
+ "' and bucket: '" << s->bucket.name <<
+ "' (unique topic: '" << topic_cfg.name <<
+ "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
try {
// TODO add endpoint LRU cache
const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint,
return "s3:ObjectCreated:Post";
case ObjectCreatedCopy:
return "s3:ObjectCreated:Copy";
+ case ObjectCreatedCompleteMultipartUpload:
+ return "s3:ObjectCreated:CompleteMultipartUpload";
case ObjectRemoved:
return "s3:ObjectRemoved:*";
case ObjectRemovedDelete:
case ObjectCreatedPut:
case ObjectCreatedPost:
case ObjectCreatedCopy:
+ case ObjectCreatedCompleteMultipartUpload:
return "OBJECT_CREATE";
- case ObjectRemoved:
case ObjectRemovedDelete:
return "OBJECT_DELETE";
case ObjectRemovedDeleteMarkerCreated:
return "DELETE_MARKER_CREATE";
+ case ObjectRemoved:
case UnknownEvent:
return "UNKNOWN_EVENT";
}
return ObjectCreatedPost;
if (s == "s3:ObjectCreated:Copy")
return ObjectCreatedCopy;
- if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE")
+ if (s == "s3:ObjectCreated:CompleteMultipartUpload")
+ return ObjectCreatedCompleteMultipartUpload;
+ if (s == "s3:ObjectRemoved:*")
return ObjectRemoved;
- if (s == "s3:ObjectRemoved:Delete")
+ if (s == "s3:ObjectRemoved:Delete" || s == "OBJECT_DELETE")
return ObjectRemovedDelete;
if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
return ObjectRemovedDeleteMarkerCreated;
namespace rgw::notify {
enum EventType {
- ObjectCreated = 0xF,
- ObjectCreatedPut = 0x1,
- ObjectCreatedPost = 0x2,
- ObjectCreatedCopy = 0x4,
- ObjectRemoved = 0xF0,
- ObjectRemovedDelete = 0x10,
- ObjectRemovedDeleteMarkerCreated = 0x20,
- UnknownEvent = 0x100
+ ObjectCreated = 0xF,
+ ObjectCreatedPut = 0x1,
+ ObjectCreatedPost = 0x2,
+ ObjectCreatedCopy = 0x4,
+ ObjectCreatedCompleteMultipartUpload = 0x8,
+ ObjectRemoved = 0xF0,
+ ObjectRemovedDelete = 0x10,
+ ObjectRemovedDeleteMarkerCreated = 0x20,
+ UnknownEvent = 0x100
};
using EventTypeList = std::vector<EventType>;
return;
}
} while (is_next_file_to_upload());
- // send request to notification manager
- const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store);
+
+ const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
if (!s->object.empty()) {
+ op_ret = get_obj_attrs(store, s, obj, attrs);
+
if (need_object_expiration() || multipart_delete) {
/* check if obj exists, read orig attrs */
- op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
return;
}
if (check_obj_lock) {
/* check if obj exists, read orig attrs */
- op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
if (op_ret == -ENOENT) {
/* object maybe delete_marker, skip check_obj_lock*/
}
}
+ // ignore return value from get_obj_attrs in all other cases
+ op_ret = 0;
+
if (check_obj_lock) {
auto aiter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
if (aiter != attrs.end()) {
} else {
op_ret = -EINVAL;
}
- // TODO: add etag calculation
- std::string etag;
- const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store);
+
+ const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+ delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+ store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
this,
s->yield);
- // TODO: use s3:ObjectCreated:Copy
const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield);
} while (op_ret == -EEXIST);
+
+ const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
int RGWCompleteMultipart::verify_permission()
} else {
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}
+
+ const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
int RGWCompleteMultipart::MPSerializer::try_lock(
CephContext* const cct;
const std::string endpoint;
const std::string topic;
+ const std::string exchange;
amqp::connection_ptr_t conn;
ack_level_t ack_level;
std::string str_ack_level;
CephContext* _cct) :
cct(_cct),
endpoint(_endpoint),
- topic(_topic),
- conn(amqp::connect(endpoint, get_exchange(args))) {
+ topic(_topic),
+ exchange(get_exchange(args)),
+ conn(amqp::connect(endpoint, exchange)) {
if (!conn) {
throw configuration_error("AMQP: failed to create connection to: " + endpoint);
}
std::string str("AMQP(0.9.1) Endpoint");
str += "\nURI: " + endpoint;
str += "\nTopic: " + topic;
+ str += "\nExchange: " + exchange;
str += "\nAck Level: " + str_ack_level;
return str;
}
ups->remove_topic(unique_topic_name);
return;
}
- ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl;
+ ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
if (!push_only) {
// generate the subscription with destination information from the original topic
}
std::string events_str = s->info.args.get("events", &exists);
- if (exists) {
- rgw::notify::from_string_list(events_str, events);
- if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
- ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
- return -EINVAL;
- }
- } else {
- // if no events are provided, we assume all events
- events.push_back(rgw::notify::ObjectCreated);
- events.push_back(rgw::notify::ObjectRemoved);
+ if (!exists) {
+ // if no events are provided, we notify on all of them
+ events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
+ }
+ rgw::notify::from_string_list(events_str, events);
+ if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+ ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+ return -EINVAL;
}
return notif_bucket_path(s->object.name, bucket_name);
}
gen_bucket_name, \
get_user, \
get_tenant
-from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
def __init__(self, exchange, topic):
import pika
hostname = get_ip()
- retries = 20
- connect_ok = False
- while retries > 0:
- try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
- connect_ok = True
- break
- except Exception as error:
- retries -= 1
- print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error))
- log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error))
- time.sleep(2)
-
- if connect_ok == False:
- raise error
-
+ connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+
self.channel = connection.channel()
self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
result = self.channel.queue_declare('', exclusive=True)
on_message_callback=self.on_message,
auto_ack=True)
self.events = []
+ self.topic = topic
def on_message(self, ch, method, properties, body):
"""callback invoked when a new message arrive on the topic"""
- log.info('AMQP received event: %s', body)
+ log.info('AMQP received event for topic %s:\n %s', self.topic, body)
self.events.append(json.loads(body))
# TODO create a base class for the AMQP and HTTP cases
verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
self.events = []
+ def get_and_reset_events(self):
+ tmp = self.events
+ self.events = []
+ return tmp
+
def amqp_receiver_thread_runner(receiver):
"""main thread function for the amqp receiver"""
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
+
+ # clean all topics
+ delete_all_s3_topics(zones[0].conn, zonegroup.name)
# create s3 topics
endpoint_address = 'amqp://127.0.0.1:7001'
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
- try:
- # get topic 3
- result, status = topic_conf3.get_config()
- assert_equal(status, 200)
- assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
- assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
- # Note that endpoint args may be ordered differently in the result
-
- # delete topic 1
- result = topic_conf1.del_config()
- assert_equal(status, 200)
-
- # try to get a deleted topic (1)
- _, status = topic_conf1.get_config()
- assert_equal(status, 404)
-
- # get the remaining 2 topics
- result = topic_conf1.get_list()
- assert_equal(len(result['Topics']), 2)
-
- # delete topics
- result = topic_conf2.del_config()
- # TODO: should be 200OK
- # assert_equal(status, 200)
- result = topic_conf3.del_config()
- # TODO: should be 200OK
- # assert_equal(status, 200)
-
- # get topic list, make sure it is empty
- result = topic_conf1.get_list()
- assert_equal(len(result['Topics']), 0)
- except AssertionError as e:
- # topics are stored at user level, so cleanup is needed
- # to prevent failures in consequent runs
- topic_conf1.del_config()
- topic_conf2.del_config()
- topic_conf3.del_config()
- raise e
+ # get topic 3
+ result, status = topic_conf3.get_config()
+ assert_equal(status, 200)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+ # Note that endpoint args may be ordered differently in the result
+
+ # delete topic 1
+ result = topic_conf1.del_config()
+ assert_equal(status, 200)
+
+ # try to get a deleted topic
+ _, status = topic_conf1.get_config()
+ assert_equal(status, 404)
+
+ # get the remaining 2 topics
+ result = topic_conf1.get_list()
+ assert_equal(len(result['Topics']), 2)
+
+ # delete topics
+ result = topic_conf2.del_config()
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
+ result = topic_conf3.del_config()
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
+
+ # get topic list, make sure it is empty
+ result = topic_conf1.get_list()
+ assert_equal(len(result['Topics']), 0)
def test_ps_s3_notification_on_master():
assert False, 'missing notification name is expected to fail'
# create s3 notification with invalid topic ARN
+ invalid_topic_arn = 'kaboom'
topic_conf_list = [{'Id': notification_name,
- 'TopicArn': 'kaboom',
+ 'TopicArn': invalid_topic_arn,
'Events': ['s3:ObjectCreated:Put']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
else:
assert False, 'invalid ARN is expected to fail'
+ # create s3 notification with unknown topic ARN
+ invalid_topic_arn = 'arn:aws:sns:a::kaboom'
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': invalid_topic_arn ,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print str(error) + ' - is expected'
+ else:
+ assert False, 'unknown topic is expected to fail'
+
# create s3 notification with wrong bucket
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
else:
assert False, 'unknown bucket is expected to fail'
- # cleanup
topic_conf.del_config()
+
+ status = topic_conf.del_config()
+ # deleting an unknown notification is not considered an error
+ assert_equal(status, 200)
+
+ _, status = topic_conf.get_config()
+ assert_equal(status, 404)
+
+ # cleanup
# delete the bucket
zones[0].delete_bucket(bucket_name)
start_time = time.time()
for i in range(number_of_objects):
key = bucket.new_key(str(i))
- content = 'bar'
+ content = str(os.urandom(1024*1024))
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post']
+ 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
clean_rabbitmq(proc)
+def test_ps_s3_multipart_on_master():
+ """ test multipart object upload on master"""
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
+ task1.start()
+ task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
+ task2.start()
+ task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
+ task3.start()
+
+ # create s3 topics
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf3.set_config()
+
+ # create s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:Post']
+ },
+ {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
+ 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.TemporaryFile(mode='w+b')
+ content = bytearray(os.urandom(1024*1024))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ uploader = bucket.initiate_multipart_upload('multipart')
+ uploader.upload_part_from_file(fp, 1)
+ uploader.complete_upload()
+ fp.close()
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver
+ events = receiver1.get_and_reset_events()
+ assert_equal(len(events), 3)
+
+ events = receiver2.get_and_reset_events()
+ assert_equal(len(events), 1)
+ assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post')
+ assert_equal(events[0]['s3']['configurationId'], notification_name+'_2')
+
+ events = receiver3.get_and_reset_events()
+ assert_equal(len(events), 1)
+ assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
+ assert_equal(events[0]['s3']['configurationId'], notification_name+'_3')
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ stop_amqp_receiver(receiver2, task2)
+ stop_amqp_receiver(receiver3, task3)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ topic_conf3.del_config()
+ for key in bucket.list():
+ key.delete()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
def test_ps_versioned_deletion():
""" test notification of deletion markers """
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
- # create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
- topic_conf.set_config()
+ # create topics
+ topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
+ _, status = topic_conf1.set_config()
+ assert_equal(status/100, 2)
+ topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
+ _, status = topic_conf2.set_config()
+ assert_equal(status/100, 2)
+
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
bucket.configure_versioning(True)
+
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
+
# create notifications
- # TODO use 'DELETE_MARKER_CREATE'
- event_type = 'OBJECT_DELETE'
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
- topic_name, event_type)
- _, status = notification_conf.set_config()
+ event_type1 = 'OBJECT_DELETE'
+ notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
+ topic_name+'_1',
+ event_type1)
+ _, status = notification_conf1.set_config()
+ assert_equal(status/100, 2)
+ event_type2 = 'DELETE_MARKER_CREATE'
+ notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
+ topic_name+'_2',
+ event_type2)
+ _, status = notification_conf2.set_config()
assert_equal(status/100, 2)
- # create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
- topic_name)
- _, status = sub_conf.set_config()
+
+ # create subscriptions
+ sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
+ topic_name+'_1')
+ _, status = sub_conf1.set_config()
+ assert_equal(status/100, 2)
+ sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
+ topic_name+'_2')
+ _, status = sub_conf2.set_config()
assert_equal(status/100, 2)
+
# create objects in the bucket
key = bucket.new_key('foo')
key.set_contents_from_string('bar')
v1 = key.version_id
key.set_contents_from_string('kaboom')
v2 = key.version_id
+ # create deletion marker
+ delete_marker_key = bucket.delete_key(key.name)
+
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # set delete markers
+
+ # delete the deletion marker
+ delete_marker_key.delete()
+ # delete versions
bucket.delete_key(key.name, version_id=v2)
bucket.delete_key(key.name, version_id=v1)
+
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
# get the delete events from the subscription
- result, _ = sub_conf.get_events()
+ result, _ = sub_conf1.get_events()
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
- assert_equal(str(event['event']), event_type)
+ assert_equal(str(event['event']), event_type1)
- # TODO: verify we have exactly 2 events
- assert len(parsed_result['events']) >= 2
+ result, _ = sub_conf2.get_events()
+ parsed_result = json.loads(result)
+ for event in parsed_result['events']:
+ log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+ assert_equal(str(event['event']), event_type2)
# cleanup
# follwing is needed for the cleanup in the case of 3-zones
zones[0].delete_bucket(bucket_name)
except:
log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
- sub_conf.del_config()
- notification_conf.del_config()
- topic_conf.del_config()
+ sub_conf1.del_config()
+ sub_conf2.del_config()
+ notification_conf1.del_config()
+ notification_conf2.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
def test_ps_s3_metadata_on_master():
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
# TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
- topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
- 'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put']
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
+ },
+ {'Id': notification_name+'_3', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:Delete']
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
v1 = key.version_id
key.set_contents_from_string('kaboom')
v2 = key.version_id
- keys = list(bucket.list())
+ # create delete marker (non versioned deletion)
+ delete_marker_key = bucket.delete_key(key.name)
- print 'wait for 5sec for the messages...'
- time.sleep(5)
+ time.sleep(1)
- # check amqp receiver
- # Note: should not do exact match in case of versioned objects
- receiver.verify_s3_events(keys, exact_match=False)
- # set delete markers
+ # versioned deletion
bucket.delete_key(key.name, version_id=v2)
bucket.delete_key(key.name, version_id=v1)
+ delete_marker_key.delete()
print 'wait for 5sec for the messages...'
time.sleep(5)
# check amqp receiver
- # Note: should not do exact match in case of versioned objects
- receiver.verify_s3_events(keys, exact_match=False, deletions=True)
+ events = receiver.get_and_reset_events()
+ delete_events = 0
+ delete_marker_create_events = 0
+ for event in events:
+ if event['eventName'] == 's3:ObjectRemoved:Delete':
+ delete_events += 1
+ assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
+ if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
+ delete_marker_create_events += 1
+ assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
+
+ # 3 key versions were deleted (v1, v2 and the deletion marker)
+ # notified over the same topic via 2 notifications (1,3)
+ assert_equal(delete_events, 3*2)
+ # 1 deletion marker was created
+ # notified over the same topic over 2 notifications (1,2)
+ assert_equal(delete_marker_create_events, 1*2)
# cleanup
stop_amqp_receiver(receiver, task)
return self.send_request('GET', get_list=True)
+def delete_all_s3_topics(conn, region):
+ try:
+ client = boto3.client('sns',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ config=Config(signature_version='s3'))
+
+ topics = client.list_topics()['Topics']
+ for topic in topics:
+ print 'topic cleanup, deleting: ' + topic['TopicArn']
+ assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
+ except:
+ print 'failed to do topic cleanup. if there are topics they may need to be manually deleted'
+
+
class PSTopicS3:
"""class to set/list/get/delete a topic
POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]