From 923799fd960f97b8ddbbe0e106139069d640588b Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 2 Sep 2019 19:24:46 +0300 Subject: [PATCH] rgw/pubsub: support deletion markers and multipart upload Signed-off-by: Yuval Lifshitz --- doc/radosgw/notifications.rst | 79 +--- doc/radosgw/pubsub-module.rst | 88 +---- doc/radosgw/s3-notification-compatibility.rst | 9 +- doc/radosgw/s3/bucketops.rst | 159 +++++++++ src/rgw/rgw_notify.cc | 6 + src/rgw/rgw_notify_event_type.cc | 11 +- src/rgw/rgw_notify_event_type.h | 17 +- src/rgw/rgw_op.cc | 35 +- src/rgw/rgw_pubsub_push.cc | 7 +- src/rgw/rgw_rest_pubsub.cc | 2 +- src/rgw/rgw_sync_module_pubsub_rest.cc | 18 +- src/test/rgw/rgw_multi/tests_ps.py | 337 +++++++++++++----- src/test/rgw/rgw_multi/zone_ps.py | 17 + 13 files changed, 504 insertions(+), 281 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 9f4f68e5c92b0..da8ad68f97c38 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -205,83 +205,13 @@ Response will have the following format: Notifications ~~~~~~~~~~~~~ -Create a Notification -````````````````````` - -This will create a publisher for a specific bucket into a topic. - -:: - - PUT /?notification - -Request parameters are encoded in XML in the body of the request, with the following format: - -:: - - - - - - - - - -- Id: name of the notification -- Topic: topic ARN -- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used) - -Delete Notification -``````````````````` - -Delete a specific, or all, notifications from a bucket. - -:: - - DELETE /bucket?notification[=] - -Request parameters: - -- notification-id: name of the notification (if not provided, all notifications on the bucket are deleted) +Detailed under: `Bucket Operations`_. .. note:: - - Notification deletion is an extension to the S3 notification API - - When the bucket is deleted, any notification defined on it is also deleted - -Get/List Notifications -`````````````````````` - -Get a specific notification, or list all notifications defined on a bucket. - -:: - - GET /bucket?notification[=] - -Request parameters: - -- notification-id: name of the notification (if not provided, all notifications on the bucket are listed) - -Response is XML formatted: - -:: - - - - - - - - - -- Id: name of the notification -- Topic: topic ARN -- Event: for list of supported events see: `S3 Notification Compatibility`_ (to support multiple types, any combination of wildcard events and ``Event`` tags may be used) - - -.. note:: - - - Getting information on a specific notification is an extension to the S3 notification API - - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used + - "Abort Multipart Upload" request does not emit a notification + - "Delete Multiple Objects" request does not emit a notification + - Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification Events @@ -356,3 +286,4 @@ pushed or pulled using the pubsub sync module. .. _PubSub Module : ../pubsub-module .. _S3 Notification Compatibility: ../s3-notification-compatibility .. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html +.. _Bucket Operations: ../s3/bucketops diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 7dc5b0e913501..6f98f63f89193 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -232,88 +232,17 @@ List all topics that user defined. S3-Compliant Notifications ~~~~~~~~~~~~~~~~~~~~~~~~~~ -Create a Notification -````````````````````` - -This will create a publisher for a specific bucket into a topic, and a subscription -for pushing/pulling events. -The subscription's name will have the same as the notification Id, and could be used later to fetch -and ack events with the subscription API. - -:: - - PUT /?notification - -Request parameters are encoded in XML in the body of the request, with the following format: - -:: - - - - - - - - - -- Id: name of the notification -- Topic: topic ARN -- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used) - -Delete Notification -``````````````````` - -Delete a specific, or all, S3-compliant notifications from a bucket. Associated subscription will also be deleted. - -:: - - DELETE /bucket?notification[=] - -Request parameters: - -- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are deleted) +Detailed under: `Bucket Operations`_. .. note:: - - Notification deletion is an extension to the S3 notification API - - When the bucket is deleted, any notification defined on it is also deleted. - In this case, the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access), + - Notification creation will also create a subscription for pushing/pulling events + - The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API. + - Notification deletion will deletes all generated subscriptions + - In case that bucket deletion implicitly deletes the notification, + the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access), and will have to be deleted explicitly with the subscription deletion API -Get/List Notifications -`````````````````````` - -Get a specific S3-compliant notification, or list all S3-compliant notifications defined on a bucket. - -:: - - GET /bucket?notification[=] - -Request parameters: - -- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are listed) - -Response is XML formatted: - -:: - - - - - - - - - -- Id: name of the notification -- Topic: topic ARN -- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used) - - -.. note:: - - - Getting information on a specific notification is an extension to the S3 notification API - - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used Non S3-Compliant Notifications ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -330,7 +259,7 @@ This will create a publisher for a specific bucket into a topic. Request parameters: - topic-name: name of topic -- event: event type (string), one of: OBJECT_CREATE, OBJECT_DELETE +- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE`` Delete Notification Information ``````````````````````````````` @@ -564,7 +493,7 @@ the events will have the following event format (JSON): ]} - id: unique ID of the event, that could be used for acking -- event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE`` +- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE`` - timestamp: timestamp indicating when the event was sent - info.attrs.mtime: timestamp indicating when the event was triggered - info.bucket.bucket_id: id of the bucket @@ -588,3 +517,4 @@ Request parameters: .. _Multisite : ../multisite .. _Bucket Notification : ../notifications +.. _Bucket Operations: ../s3/bucketops diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index fe016a27b9e50..91d66a7078ced 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -79,14 +79,13 @@ Event Types +----------------------------------------------+-----------------+-------------------------------------------+ | ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level | +----------------------------------------------+-----------------+-------------------------------------------+ -| ``s3:ObjectCreated:CompleteMultipartUpload`` | Not Supported | Supported at ``s3:ObjectCreated:*`` level | -| (an extension to AWS) | | | +| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level | +----------------------------------------------+-----------------+-------------------------------------------+ -| ``s3:ObjectRemoved:*`` | Supported | +| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below | +----------------------------------------------+-----------------+-------------------------------------------+ -| ``s3:ObjectRemoved:Delete`` | Supported | supported at ``s3:ObjectRemoved:*`` level | +| ``s3:ObjectRemoved:Delete`` | Supported | +----------------------------------------------+-----------------+-------------------------------------------+ -| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported at ``s3:ObjectRemoved:*`` level | +| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported | +----------------------------------------------+-----------------+-------------------------------------------+ | ``s3:ObjectRestore:Post`` | Not applicable to Ceph | +----------------------------------------------+-----------------+-------------------------------------------+ diff --git a/doc/radosgw/s3/bucketops.rst b/doc/radosgw/s3/bucketops.rst index ffa14e8950d7b..25a23a2826a19 100644 --- a/doc/radosgw/s3/bucketops.rst +++ b/doc/radosgw/s3/bucketops.rst @@ -477,3 +477,162 @@ Response Entities | ``Years`` | Integer | The number of years specified for the default retention period. | No | +-----------------------------+-------------+----------------------------------------------------------------------------------------+----------+ +Create Notification +------------------- + +Create a publisher for a specific bucket into a topic. + +Syntax +~~~~~~ + +:: + + PUT /?notification HTTP/1.1 + + +Request Entities +~~~~~~~~~~~~~~~~ + +Parameters are XML encoded in the body of the request, in the following format: + +:: + + + + + + + + + ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| Name | Type | Description | Required | ++===============================+===========+======================================================================================+==========+ +| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Id`` | String | Name of the notification | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Topic`` | String | Topic ARN. Topic must be created beforehand | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No | +| | | entities can be used. If omitted, all events are handled | | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ + +HTTP Response +~~~~~~~~~~~~~ + ++---------------+-----------------------+----------------------------------------------------------+ +| HTTP Status | Status Code | Description | ++===============+=======================+==========================================================+ +| ``400`` | MalformedXML | The XML is not well-formed | ++---------------+-----------------------+----------------------------------------------------------+ +| ``400`` | InvalidArgument | Missing Id; Missing/Invalid Topic ARN; Invalid Event | ++---------------+-----------------------+----------------------------------------------------------+ +| ``404`` | NoSuchBucket | The bucket does not exist | ++---------------+-----------------------+----------------------------------------------------------+ +| ``404`` | NoSuchKey | The topic does not exist | ++---------------+-----------------------+----------------------------------------------------------+ + + +Delete Notification +------------------- + +Delete a specific, or all, notifications from a bucket. + +.. note:: + + - Notification deletion is an extension to the S3 notification API + - When the bucket is deleted, any notification defined on it is also deleted + - Deleting an unkown notification (e.g. double delete) is not considered an error + +Syntax +~~~~~~ + +:: + + DELETE /bucket?notification[=] HTTP/1.1 + + +Parameters +~~~~~~~~~~ + ++------------------------+-----------+----------------------------------------------------------------------------------------+ +| Name | Type | Description | ++========================+===========+========================================================================================+ +| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are deleted | ++------------------------+-----------+----------------------------------------------------------------------------------------+ + +HTTP Response +~~~~~~~~~~~~~ + ++---------------+-----------------------+----------------------------------------------------------+ +| HTTP Status | Status Code | Description | ++===============+=======================+==========================================================+ +| ``404`` | NoSuchBucket | The bucket does not exist | ++---------------+-----------------------+----------------------------------------------------------+ + +Get/List Notification +--------------------- + +Get a specific notification, or list all notifications configured on a bucket. + +Syntax +~~~~~~ + +:: + + GET /bucket?notification[=] HTTP/1.1 + + +Parameters +~~~~~~~~~~ + ++------------------------+-----------+----------------------------------------------------------------------------------------+ +| Name | Type | Description | ++========================+===========+========================================================================================+ +| ``notification-id`` | String | Name of the notification. If not provided, all notifications on the bucket are listed | ++------------------------+-----------+----------------------------------------------------------------------------------------+ + +Response Entities +~~~~~~~~~~~~~~~~~ + +Response is XML encoded in the body of the request, in the following format: + +:: + + + + + + + + + ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| Name | Type | Description | Required | ++===============================+===========+======================================================================================+==========+ +| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``TopicConfiguration`` | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Id`` | String | Name of the notification | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Topic`` | String | Topic ARN | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``Event`` | String | Handled event. Multiple ``Event`` entities may exist | Yes | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ + +HTTP Response +~~~~~~~~~~~~~ + ++---------------+-----------------------+----------------------------------------------------------+ +| HTTP Status | Status Code | Description | ++===============+=======================+==========================================================+ +| ``404`` | NoSuchBucket | The bucket does not exist | ++---------------+-----------------------+----------------------------------------------------------+ +| ``404`` | NoSuchKey | The notification does not exist (if provided) | ++---------------+-----------------------+----------------------------------------------------------+ + +.. _S3 Notification Compatibility: ../s3-notification-compatibility diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 944fda8282cc5..ea5f0fd6f4263 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -84,6 +84,12 @@ int publish(const req_state* s, continue; } event_should_be_handled = true; + record.configurationId = topic_filter.s3_id; + ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << + "' on topic: '" << topic_cfg.dest.arn_topic << + "' and bucket: '" << s->bucket.name << + "' (unique topic: '" << topic_cfg.name << + "') apply to event of type: '" << to_string(event_type) << "'" << dendl; try { // TODO add endpoint LRU cache const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint, diff --git a/src/rgw/rgw_notify_event_type.cc b/src/rgw/rgw_notify_event_type.cc index a95d348380537..10c77c2818455 100644 --- a/src/rgw/rgw_notify_event_type.cc +++ b/src/rgw/rgw_notify_event_type.cc @@ -16,6 +16,8 @@ namespace rgw::notify { return "s3:ObjectCreated:Post"; case ObjectCreatedCopy: return "s3:ObjectCreated:Copy"; + case ObjectCreatedCompleteMultipartUpload: + return "s3:ObjectCreated:CompleteMultipartUpload"; case ObjectRemoved: return "s3:ObjectRemoved:*"; case ObjectRemovedDelete: @@ -34,12 +36,13 @@ namespace rgw::notify { case ObjectCreatedPut: case ObjectCreatedPost: case ObjectCreatedCopy: + case ObjectCreatedCompleteMultipartUpload: return "OBJECT_CREATE"; - case ObjectRemoved: case ObjectRemovedDelete: return "OBJECT_DELETE"; case ObjectRemovedDeleteMarkerCreated: return "DELETE_MARKER_CREATE"; + case ObjectRemoved: case UnknownEvent: return "UNKNOWN_EVENT"; } @@ -55,9 +58,11 @@ namespace rgw::notify { return ObjectCreatedPost; if (s == "s3:ObjectCreated:Copy") return ObjectCreatedCopy; - if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE") + if (s == "s3:ObjectCreated:CompleteMultipartUpload") + return ObjectCreatedCompleteMultipartUpload; + if (s == "s3:ObjectRemoved:*") return ObjectRemoved; - if (s == "s3:ObjectRemoved:Delete") + if (s == "s3:ObjectRemoved:Delete" || s == "OBJECT_DELETE") return ObjectRemovedDelete; if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE") return ObjectRemovedDeleteMarkerCreated; diff --git a/src/rgw/rgw_notify_event_type.h b/src/rgw/rgw_notify_event_type.h index 2ac2d1f8c1f9d..0d86bf3f3219f 100644 --- a/src/rgw/rgw_notify_event_type.h +++ b/src/rgw/rgw_notify_event_type.h @@ -7,14 +7,15 @@ namespace rgw::notify { enum EventType { - ObjectCreated = 0xF, - ObjectCreatedPut = 0x1, - ObjectCreatedPost = 0x2, - ObjectCreatedCopy = 0x4, - ObjectRemoved = 0xF0, - ObjectRemovedDelete = 0x10, - ObjectRemovedDeleteMarkerCreated = 0x20, - UnknownEvent = 0x100 + ObjectCreated = 0xF, + ObjectCreatedPut = 0x1, + ObjectCreatedPost = 0x2, + ObjectCreatedCopy = 0x4, + ObjectCreatedCompleteMultipartUpload = 0x8, + ObjectRemoved = 0xF0, + ObjectRemovedDelete = 0x10, + ObjectRemovedDeleteMarkerCreated = 0x20, + UnknownEvent = 0x100 }; using EventTypeList = std::vector; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 238b8ba571f6d..cc61ebe81041c 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -4278,8 +4278,8 @@ void RGWPostObj::execute() return; } } while (is_next_file_to_upload()); - // send request to notification manager - const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store); + + const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -4705,9 +4705,10 @@ void RGWDeleteObj::execute() bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled(); if (!s->object.empty()) { + op_ret = get_obj_attrs(store, s, obj, attrs); + if (need_object_expiration() || multipart_delete) { /* check if obj exists, read orig attrs */ - op_ret = get_obj_attrs(store, s, obj, attrs); if (op_ret < 0) { return; } @@ -4715,7 +4716,6 @@ void RGWDeleteObj::execute() if (check_obj_lock) { /* check if obj exists, read orig attrs */ - op_ret = get_obj_attrs(store, s, obj, attrs); if (op_ret < 0) { if (op_ret == -ENOENT) { /* object maybe delete_marker, skip check_obj_lock*/ @@ -4726,6 +4726,9 @@ void RGWDeleteObj::execute() } } + // ignore return value from get_obj_attrs in all other cases + op_ret = 0; + if (check_obj_lock) { auto aiter = attrs.find(RGW_ATTR_OBJECT_RETENTION); if (aiter != attrs.end()) { @@ -4828,9 +4831,10 @@ void RGWDeleteObj::execute() } else { op_ret = -EINVAL; } - // TODO: add etag calculation - std::string etag; - const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store); + + const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), + delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, + store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -5142,7 +5146,6 @@ void RGWCopyObj::execute() this, s->yield); - // TODO: use s3:ObjectCreated:Copy const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; @@ -5820,6 +5823,14 @@ void RGWInitMultipart::execute() op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield); } while (op_ret == -EEXIST); + + const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; + // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed + // this should be global conf (probably returnign a different handler) + // so we don't need to read the configured values before we perform it + } } int RGWCompleteMultipart::verify_permission() @@ -6126,6 +6137,14 @@ void RGWCompleteMultipart::execute() } else { ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl; } + + const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; + // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed + // this should be global conf (probably returnign a different handler) + // so we don't need to read the configured values before we perform it + } } int RGWCompleteMultipart::MPSerializer::try_lock( diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index dd825e0570b35..44292a3ce607b 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -160,6 +160,7 @@ private: CephContext* const cct; const std::string endpoint; const std::string topic; + const std::string exchange; amqp::connection_ptr_t conn; ack_level_t ack_level; std::string str_ack_level; @@ -271,8 +272,9 @@ public: CephContext* _cct) : cct(_cct), endpoint(_endpoint), - topic(_topic), - conn(amqp::connect(endpoint, get_exchange(args))) { + topic(_topic), + exchange(get_exchange(args)), + conn(amqp::connect(endpoint, exchange)) { if (!conn) { throw configuration_error("AMQP: failed to create connection to: " + endpoint); } @@ -393,6 +395,7 @@ public: std::string str("AMQP(0.9.1) Endpoint"); str += "\nURI: " + endpoint; str += "\nTopic: " + topic; + str += "\nExchange: " + exchange; str += "\nAck Level: " + str_ack_level; return str; } diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 4a2e365fd2d74..e4bf0bf326f99 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -509,7 +509,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { ups->remove_topic(unique_topic_name); return; } - ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl; + ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl; if (!push_only) { // generate the subscription with destination information from the original topic diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index b1f250313814d..b198bb33b19f8 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -346,16 +346,14 @@ private: } std::string events_str = s->info.args.get("events", &exists); - if (exists) { - rgw::notify::from_string_list(events_str, events); - if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) { - ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl; - return -EINVAL; - } - } else { - // if no events are provided, we assume all events - events.push_back(rgw::notify::ObjectCreated); - events.push_back(rgw::notify::ObjectRemoved); + if (!exists) { + // if no events are provided, we notify on all of them + events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE"; + } + rgw::notify::from_string_list(events_str, events); + if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) { + ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl; + return -EINVAL; } return notif_bucket_path(s->object.name, bucket_name); } diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 786a1c6617e76..a3b7807582e45 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -20,7 +20,7 @@ from .tests import get_realm, \ gen_bucket_name, \ get_user, \ get_tenant -from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info +from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics from multisite import User from nose import SkipTest from nose.tools import assert_not_equal, assert_equal @@ -152,22 +152,8 @@ class AMQPReceiver(object): def __init__(self, exchange, topic): import pika hostname = get_ip() - retries = 20 - connect_ok = False - while retries > 0: - try: - connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) - connect_ok = True - break - except Exception as error: - retries -= 1 - print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error)) - log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error)) - time.sleep(2) - - if connect_ok == False: - raise error - + connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) + self.channel = connection.channel() self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) result = self.channel.queue_declare('', exclusive=True) @@ -177,10 +163,11 @@ class AMQPReceiver(object): on_message_callback=self.on_message, auto_ack=True) self.events = [] + self.topic = topic def on_message(self, ch, method, properties, body): """callback invoked when a new message arrive on the topic""" - log.info('AMQP received event: %s', body) + log.info('AMQP received event for topic %s:\n %s', self.topic, body) self.events.append(json.loads(body)) # TODO create a base class for the AMQP and HTTP cases @@ -194,6 +181,11 @@ class AMQPReceiver(object): verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) self.events = [] + def get_and_reset_events(self): + tmp = self.events + self.events = [] + return tmp + def amqp_receiver_thread_runner(receiver): """main thread function for the amqp receiver""" @@ -613,6 +605,9 @@ def test_ps_s3_topic_on_master(): zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX + + # clean all topics + delete_all_s3_topics(zones[0].conn, zonegroup.name) # create s3 topics endpoint_address = 'amqp://127.0.0.1:7001' @@ -635,44 +630,36 @@ def test_ps_s3_topic_on_master(): assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3') - try: - # get topic 3 - result, status = topic_conf3.get_config() - assert_equal(status, 200) - assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) - assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) - # Note that endpoint args may be ordered differently in the result - - # delete topic 1 - result = topic_conf1.del_config() - assert_equal(status, 200) - - # try to get a deleted topic (1) - _, status = topic_conf1.get_config() - assert_equal(status, 404) - - # get the remaining 2 topics - result = topic_conf1.get_list() - assert_equal(len(result['Topics']), 2) - - # delete topics - result = topic_conf2.del_config() - # TODO: should be 200OK - # assert_equal(status, 200) - result = topic_conf3.del_config() - # TODO: should be 200OK - # assert_equal(status, 200) - - # get topic list, make sure it is empty - result = topic_conf1.get_list() - assert_equal(len(result['Topics']), 0) - except AssertionError as e: - # topics are stored at user level, so cleanup is needed - # to prevent failures in consequent runs - topic_conf1.del_config() - topic_conf2.del_config() - topic_conf3.del_config() - raise e + # get topic 3 + result, status = topic_conf3.get_config() + assert_equal(status, 200) + assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) + assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) + # Note that endpoint args may be ordered differently in the result + + # delete topic 1 + result = topic_conf1.del_config() + assert_equal(status, 200) + + # try to get a deleted topic + _, status = topic_conf1.get_config() + assert_equal(status, 404) + + # get the remaining 2 topics + result = topic_conf1.get_list() + assert_equal(len(result['Topics']), 2) + + # delete topics + result = topic_conf2.del_config() + # TODO: should be 200OK + # assert_equal(status, 200) + result = topic_conf3.del_config() + # TODO: should be 200OK + # assert_equal(status, 200) + + # get topic list, make sure it is empty + result = topic_conf1.get_list() + assert_equal(len(result['Topics']), 0) def test_ps_s3_notification_on_master(): @@ -779,8 +766,9 @@ def test_ps_s3_notification_errors_on_master(): assert False, 'missing notification name is expected to fail' # create s3 notification with invalid topic ARN + invalid_topic_arn = 'kaboom' topic_conf_list = [{'Id': notification_name, - 'TopicArn': 'kaboom', + 'TopicArn': invalid_topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) @@ -791,6 +779,20 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'invalid ARN is expected to fail' + # create s3 notification with unknown topic ARN + invalid_topic_arn = 'arn:aws:sns:a::kaboom' + topic_conf_list = [{'Id': notification_name, + 'TopicArn': invalid_topic_arn , + 'Events': ['s3:ObjectCreated:Put'] + }] + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + try: + _, _ = s3_notification_conf.set_config() + except Exception as error: + print str(error) + ' - is expected' + else: + assert False, 'unknown topic is expected to fail' + # create s3 notification with wrong bucket topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -804,8 +806,16 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'unknown bucket is expected to fail' - # cleanup topic_conf.del_config() + + status = topic_conf.del_config() + # deleting an unknown notification is not considered an error + assert_equal(status, 200) + + _, status = topic_conf.get_config() + assert_equal(status, 404) + + # cleanup # delete the bucket zones[0].delete_bucket(bucket_name) @@ -905,7 +915,7 @@ def test_ps_s3_notification_push_amqp_on_master(): start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) - content = 'bar' + content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) @@ -1596,7 +1606,7 @@ def test_ps_s3_creation_triggers_on_master(): # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post'] + 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy'] }] s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) @@ -1636,55 +1646,178 @@ def test_ps_s3_creation_triggers_on_master(): clean_rabbitmq(proc) +def test_ps_s3_multipart_on_master(): + """ test multipart object upload on master""" + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + zones, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # start amqp receivers + exchange = 'ex1' + task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') + task1.start() + task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2') + task2.start() + task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3') + task3.start() + + # create s3 topics + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' + topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) + topic_arn1 = topic_conf1.set_config() + topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) + topic_arn2 = topic_conf2.set_config() + topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) + topic_arn3 = topic_conf3.set_config() + + # create s3 notifications + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, + 'Events': ['s3:ObjectCreated:*'] + }, + {'Id': notification_name+'_2', 'TopicArn': topic_arn2, + 'Events': ['s3:ObjectCreated:Post'] + }, + {'Id': notification_name+'_3', 'TopicArn': topic_arn3, + 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] + }] + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket using multi-part upload + fp = tempfile.TemporaryFile(mode='w+b') + content = bytearray(os.urandom(1024*1024)) + fp.write(content) + fp.flush() + fp.seek(0) + uploader = bucket.initiate_multipart_upload('multipart') + uploader.upload_part_from_file(fp, 1) + uploader.complete_upload() + fp.close() + + print 'wait for 5sec for the messages...' + time.sleep(5) + + # check amqp receiver + events = receiver1.get_and_reset_events() + assert_equal(len(events), 3) + + events = receiver2.get_and_reset_events() + assert_equal(len(events), 1) + assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post') + assert_equal(events[0]['s3']['configurationId'], notification_name+'_2') + + events = receiver3.get_and_reset_events() + assert_equal(len(events), 1) + assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') + assert_equal(events[0]['s3']['configurationId'], notification_name+'_3') + + # cleanup + stop_amqp_receiver(receiver1, task1) + stop_amqp_receiver(receiver2, task2) + stop_amqp_receiver(receiver3, task3) + s3_notification_conf.del_config() + topic_conf1.del_config() + topic_conf2.del_config() + topic_conf3.del_config() + for key in bucket.list(): + key.delete() + # delete the bucket + zones[0].delete_bucket(bucket_name) + clean_rabbitmq(proc) + + def test_ps_versioned_deletion(): """ test notification of deletion markers """ zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX - # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) - topic_conf.set_config() + # create topics + topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1') + _, status = topic_conf1.set_config() + assert_equal(status/100, 2) + topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2') + _, status = topic_conf2.set_config() + assert_equal(status/100, 2) + # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) bucket.configure_versioning(True) + # wait for sync zone_meta_checkpoint(ps_zones[0].zone) + # create notifications - # TODO use 'DELETE_MARKER_CREATE' - event_type = 'OBJECT_DELETE' - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, - topic_name, event_type) - _, status = notification_conf.set_config() + event_type1 = 'OBJECT_DELETE' + notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name, + topic_name+'_1', + event_type1) + _, status = notification_conf1.set_config() + assert_equal(status/100, 2) + event_type2 = 'DELETE_MARKER_CREATE' + notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name, + topic_name+'_2', + event_type2) + _, status = notification_conf2.set_config() assert_equal(status/100, 2) - # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, - topic_name) - _, status = sub_conf.set_config() + + # create subscriptions + sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1', + topic_name+'_1') + _, status = sub_conf1.set_config() + assert_equal(status/100, 2) + sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2', + topic_name+'_2') + _, status = sub_conf2.set_config() assert_equal(status/100, 2) + # create objects in the bucket key = bucket.new_key('foo') key.set_contents_from_string('bar') v1 = key.version_id key.set_contents_from_string('kaboom') v2 = key.version_id + # create deletion marker + delete_marker_key = bucket.delete_key(key.name) + # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # set delete markers + + # delete the deletion marker + delete_marker_key.delete() + # delete versions bucket.delete_key(key.name, version_id=v2) bucket.delete_key(key.name, version_id=v1) + # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) # get the delete events from the subscription - result, _ = sub_conf.get_events() + result, _ = sub_conf1.get_events() parsed_result = json.loads(result) for event in parsed_result['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') - assert_equal(str(event['event']), event_type) + assert_equal(str(event['event']), event_type1) - # TODO: verify we have exactly 2 events - assert len(parsed_result['events']) >= 2 + result, _ = sub_conf2.get_events() + parsed_result = json.loads(result) + for event in parsed_result['events']: + log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') + assert_equal(str(event['event']), event_type2) # cleanup # follwing is needed for the cleanup in the case of 3-zones @@ -1697,9 +1830,12 @@ def test_ps_versioned_deletion(): zones[0].delete_bucket(bucket_name) except: log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket') - sub_conf.del_config() - notification_conf.del_config() - topic_conf.del_config() + sub_conf1.del_config() + sub_conf2.del_config() + notification_conf1.del_config() + notification_conf2.del_config() + topic_conf1.del_config() + topic_conf2.del_config() def test_ps_s3_metadata_on_master(): @@ -1791,8 +1927,14 @@ def test_ps_s3_versioned_deletion_on_master(): # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code - topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, - 'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put'] + topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, + 'Events': ['s3:ObjectRemoved:*'] + }, + {'Id': notification_name+'_2', 'TopicArn': topic_arn, + 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated'] + }, + {'Id': notification_name+'_3', 'TopicArn': topic_arn, + 'Events': ['s3:ObjectRemoved:Delete'] }] s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() @@ -1804,24 +1946,37 @@ def test_ps_s3_versioned_deletion_on_master(): v1 = key.version_id key.set_contents_from_string('kaboom') v2 = key.version_id - keys = list(bucket.list()) + # create delete marker (non versioned deletion) + delete_marker_key = bucket.delete_key(key.name) - print 'wait for 5sec for the messages...' - time.sleep(5) + time.sleep(1) - # check amqp receiver - # Note: should not do exact match in case of versioned objects - receiver.verify_s3_events(keys, exact_match=False) - # set delete markers + # versioned deletion bucket.delete_key(key.name, version_id=v2) bucket.delete_key(key.name, version_id=v1) + delete_marker_key.delete() print 'wait for 5sec for the messages...' time.sleep(5) # check amqp receiver - # Note: should not do exact match in case of versioned objects - receiver.verify_s3_events(keys, exact_match=False, deletions=True) + events = receiver.get_and_reset_events() + delete_events = 0 + delete_marker_create_events = 0 + for event in events: + if event['eventName'] == 's3:ObjectRemoved:Delete': + delete_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] + if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': + delete_marker_create_events += 1 + assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] + + # 3 key versions were deleted (v1, v2 and the deletion marker) + # notified over the same topic via 2 notifications (1,3) + assert_equal(delete_events, 3*2) + # 1 deletion marker was created + # notified over the same topic over 2 notifications (1,2) + assert_equal(delete_marker_create_events, 1*2) # cleanup stop_amqp_receiver(receiver, task) diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index d2aa5d346cebb..8ee49f2036b16 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -130,6 +130,23 @@ class PSTopic: return self.send_request('GET', get_list=True) +def delete_all_s3_topics(conn, region): + try: + client = boto3.client('sns', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + region_name=region, + config=Config(signature_version='s3')) + + topics = client.list_topics()['Topics'] + for topic in topics: + print 'topic cleanup, deleting: ' + topic['TopicArn'] + assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200 + except: + print 'failed to do topic cleanup. if there are topics they may need to be manually deleted' + + class PSTopicS3: """class to set/list/get/delete a topic POST ?Action=CreateTopic&Name=&push-endpoint=&[=...]] -- 2.39.5