]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: support deletion markers and multipart upload
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 2 Sep 2019 16:24:46 +0000 (19:24 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Sep 2019 15:54:05 +0000 (18:54 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
13 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
doc/radosgw/s3/bucketops.rst
src/rgw/rgw_notify.cc
src/rgw/rgw_notify_event_type.cc
src/rgw/rgw_notify_event_type.h
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 9f4f68e5c92b06963bdc8b897cc3e2eb79690134..da8ad68f97c38a65381fccdfdb93de272e6d2eeb 100644 (file)
@@ -205,83 +205,13 @@ Response will have the following format:
 Notifications
 ~~~~~~~~~~~~~
 
-Create a Notification
-`````````````````````
-
-This will create a publisher for a specific bucket into a topic.
-
-::
-
-   PUT /<bucket name>?notification
-
-Request parameters are encoded in XML in the body of the request, with the following format:
-
-::
-
-   <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
-       <TopicConfiguration>
-           <Id></Id>
-           <Topic></Topic>
-           <Event></Event>
-       </TopicConfiguration>
-   </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-Delete Notification
-```````````````````
-
-Delete a specific, or all, notifications from a bucket.
-
-::
-
-   DELETE /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all notifications on the bucket are deleted)
+Detailed under: `Bucket Operations`_.
 
 .. note:: 
 
-    - Notification deletion is an extension to the S3 notification API
-    - When the bucket is deleted, any notification defined on it is also deleted 
-
-Get/List Notifications
-``````````````````````
-
-Get a specific notification, or list all notifications defined on a bucket.
-
-::
-
-   GET /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all notifications on the bucket are listed)
-
-Response is XML formatted:
-
-::
-
-   <NotificationConfiguration>
-       <TopicConfiguration>
-           <Id></Id>
-           <Topic></Topic>
-           <Event></Event>
-       </TopicConfiguration>
-   </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: for list of supported events see: `S3 Notification Compatibility`_ (to support multiple types, any combination of wildcard events and ``Event`` tags may be used)
-
-
-.. note::
-
-    - Getting information on a specific notification is an extension to the S3 notification API
-    - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
+    - "Abort Multipart Upload" request does not emit a notification
+    - "Delete Multiple Objects" request does not emit a notification
+    - Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
 
 
 Events
@@ -356,3 +286,4 @@ pushed or pulled using the pubsub sync module.
 .. _PubSub Module : ../pubsub-module
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
 .. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
+.. _Bucket Operations: ../s3/bucketops
index 7dc5b0e91350160c7fe6c8dfd5719de76b85d3f1..6f98f63f8919338b7f8e7d24d122027ee0e08272 100644 (file)
@@ -232,88 +232,17 @@ List all topics that user defined.
 S3-Compliant Notifications
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Create a Notification
-`````````````````````
-
-This will create a publisher for a specific bucket into a topic, and a subscription
-for pushing/pulling events.
-The subscription's name will have the same as the notification Id, and could be used later to fetch
-and ack events with the subscription API.
-
-::
-
-   PUT /<bucket name>?notification
-
-Request parameters are encoded in XML in the body of the request, with the following format:
-
-::
-
-   <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
-       <TopicConfiguration>
-           <Id></Id>
-           <Topic></Topic>
-           <Event></Event>
-       </TopicConfiguration>
-   </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-Delete Notification
-```````````````````
-
-Delete a specific, or all, S3-compliant notifications from a bucket. Associated subscription will also be deleted.
-
-::
-
-   DELETE /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are deleted)
+Detailed under: `Bucket Operations`_.
 
 .. note:: 
 
-    - Notification deletion is an extension to the S3 notification API
-    - When the bucket is deleted, any notification defined on it is also deleted. 
-      In this case, the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access), 
+    - Notification creation will also create a subscription for pushing/pulling events
+    - The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API.
+    - Notification deletion will deletes all generated subscriptions
+    - In case that bucket deletion implicitly deletes the notification, 
+      the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
       and will have to be deleted explicitly with the subscription deletion API
 
-Get/List Notifications
-``````````````````````
-
-Get a specific S3-compliant notification, or list all S3-compliant notifications defined on a bucket.
-
-::
-
-   GET /bucket?notification[=<notification-id>]
-
-Request parameters:
-
-- notification-id: name of the notification (if not provided, all S3-compliant notifications on the bucket are listed)
-
-Response is XML formatted:
-
-::
-
-   <NotificationConfiguration>
-       <TopicConfiguration>
-           <Id></Id>
-           <Topic></Topic>
-           <Event></Event>
-       </TopicConfiguration>
-   </NotificationConfiguration>
-
-- Id: name of the notification
-- Topic: topic ARN
-- Event: either ``s3:ObjectCreated:*``, or ``s3:ObjectRemoved:*`` (multiple ``Event`` tags may be used)
-
-
-.. note::
-
-    - Getting information on a specific notification is an extension to the S3 notification API
-    - When multiple notifications are fetched from the bucket, multiple ``NotificationConfiguration`` tags will be used
 
 Non S3-Compliant Notifications
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -330,7 +259,7 @@ This will create a publisher for a specific bucket into a topic.
 Request parameters:
 
 - topic-name: name of topic
-- event: event type (string), one of: OBJECT_CREATE, OBJECT_DELETE 
+- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
  
 Delete Notification Information
 ```````````````````````````````
@@ -564,7 +493,7 @@ the events will have the following event format (JSON):
    ]}
 
 - id: unique ID of the event, that could be used for acking
-- event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE``
+- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
 - timestamp: timestamp indicating when the event was sent
 - info.attrs.mtime: timestamp indicating when the event was triggered
 - info.bucket.bucket_id: id of the bucket
@@ -588,3 +517,4 @@ Request parameters:
 
 .. _Multisite : ../multisite
 .. _Bucket Notification : ../notifications
+.. _Bucket Operations: ../s3/bucketops
index fe016a27b9e508e47ad0b2ca65e8caf0d62e16d5..91d66a7078ced956915b201cfd363f5fb9dbb409 100644 (file)
@@ -79,14 +79,13 @@ Event Types
 +----------------------------------------------+-----------------+-------------------------------------------+
 | ``s3:ObjectCreated:Copy``                    | Supported       | Supported at ``s3:ObjectCreated:*`` level |
 +----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectCreated:CompleteMultipartUpload`` | Not Supported   | Supported at ``s3:ObjectCreated:*`` level |
-| (an extension to AWS)                        |                 |                                           |
+| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported       | Supported at ``s3:ObjectCreated:*`` level |
 +----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:*``                       | Supported                                                   |
+| ``s3:ObjectRemoved:*``                       | Supported       | Supported only the specific events below  |
 +----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:Delete``                  | Supported       | supported at ``s3:ObjectRemoved:*`` level |
+| ``s3:ObjectRemoved:Delete``                  | Supported                                                   |
 +----------------------------------------------+-----------------+-------------------------------------------+
-| ``s3:ObjectRemoved:DeleteMarkerCreated``     | Supported at ``s3:ObjectRemoved:*`` level                   |
+| ``s3:ObjectRemoved:DeleteMarkerCreated``     | Supported                                                   |
 +----------------------------------------------+-----------------+-------------------------------------------+
 | ``s3:ObjectRestore:Post``                    | Not applicable to Ceph                                      |
 +----------------------------------------------+-----------------+-------------------------------------------+
index ffa14e8950d7b1f0f2e12043d890174d31379fe7..25a23a2826a199e96d8f7a968796759b0d66afee 100644 (file)
@@ -477,3 +477,162 @@ Response Entities
 | ``Years``                   | Integer     | The number of years specified for the default retention period.                        |   No     |
 +-----------------------------+-------------+----------------------------------------------------------------------------------------+----------+
 
+Create Notification
+-------------------
+
+Create a publisher for a specific bucket into a topic.
+
+Syntax
+~~~~~~
+
+::
+
+    PUT /<bucket name>?notification HTTP/1.1
+
+
+Request Entities
+~~~~~~~~~~~~~~~~
+
+Parameters are XML encoded in the body of the request, in the following format:
+
+::
+
+   <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+       <TopicConfiguration>
+           <Id></Id>
+           <Topic></Topic>
+           <Event></Event>
+       </TopicConfiguration>
+   </NotificationConfiguration>
+
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| Name                          | Type      | Description                                                                          | Required |
++===============================+===========+======================================================================================+==========+
+| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities                                      | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``TopicConfiguration``        | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities                             | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Id``                        | String    | Name of the notification                                                             | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Topic``                     | String    | Topic ARN. Topic must be created beforehand                                          | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Event``                     | String    | List of supported events see: `S3 Notification Compatibility`_.  Multiple ``Event``  | No       |
+|                               |           | entities can be used. If omitted, all events are handled                             |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status   | Status Code           | Description                                              |
++===============+=======================+==========================================================+
+| ``400``       | MalformedXML          | The XML is not well-formed                               |
++---------------+-----------------------+----------------------------------------------------------+
+| ``400``       | InvalidArgument       | Missing Id; Missing/Invalid Topic ARN; Invalid Event     |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404``       | NoSuchBucket          | The bucket does not exist                                |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404``       | NoSuchKey             | The topic does not exist                                 |
++---------------+-----------------------+----------------------------------------------------------+
+
+
+Delete Notification
+-------------------
+
+Delete a specific, or all, notifications from a bucket.
+
+.. note:: 
+
+    - Notification deletion is an extension to the S3 notification API
+    - When the bucket is deleted, any notification defined on it is also deleted 
+    - Deleting an unkown notification (e.g. double delete) is not considered an error
+
+Syntax
+~~~~~~
+
+::
+
+    DELETE /bucket?notification[=<notification-id>] HTTP/1.1
+
+
+Parameters
+~~~~~~~~~~
+
++------------------------+-----------+----------------------------------------------------------------------------------------+
+| Name                   | Type      | Description                                                                            |
++========================+===========+========================================================================================+
+| ``notification-id``    | String    | Name of the notification. If not provided, all notifications on the bucket are deleted |
++------------------------+-----------+----------------------------------------------------------------------------------------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status   | Status Code           | Description                                              |
++===============+=======================+==========================================================+
+| ``404``       | NoSuchBucket          | The bucket does not exist                                |
++---------------+-----------------------+----------------------------------------------------------+
+
+Get/List Notification
+---------------------
+
+Get a specific notification, or list all notifications configured on a bucket.
+
+Syntax
+~~~~~~
+
+::
+
+    GET /bucket?notification[=<notification-id>] HTTP/1.1 
+
+
+Parameters
+~~~~~~~~~~
+
++------------------------+-----------+----------------------------------------------------------------------------------------+
+| Name                   | Type      | Description                                                                            |
++========================+===========+========================================================================================+
+| ``notification-id``    | String    | Name of the notification. If not provided, all notifications on the bucket are listed  |
++------------------------+-----------+----------------------------------------------------------------------------------------+
+
+Response Entities
+~~~~~~~~~~~~~~~~~
+
+Response is XML encoded in the body of the request, in the following format:
+
+::
+
+   <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+       <TopicConfiguration>
+           <Id></Id>
+           <Topic></Topic>
+           <Event></Event>
+       </TopicConfiguration>
+   </NotificationConfiguration>
+
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| Name                          | Type      | Description                                                                          | Required |
++===============================+===========+======================================================================================+==========+
+| ``NotificationConfiguration`` | Container | Holding list of ``TopicConfiguration`` entities                                      | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``TopicConfiguration``        | Container | Holding ``Id``, ``Topic`` and list of ``Event`` entities                             | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Id``                        | String    | Name of the notification                                                             | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Topic``                     | String    | Topic ARN                                                                            | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Event``                     | String    | Handled event. Multiple ``Event`` entities may exist                                 | Yes      |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
++---------------+-----------------------+----------------------------------------------------------+
+| HTTP Status   | Status Code           | Description                                              |
++===============+=======================+==========================================================+
+| ``404``       | NoSuchBucket          | The bucket does not exist                                |
++---------------+-----------------------+----------------------------------------------------------+
+| ``404``       | NoSuchKey             | The notification does not exist (if provided)            |
++---------------+-----------------------+----------------------------------------------------------+
+
+.. _S3 Notification Compatibility: ../s3-notification-compatibility
index 944fda8282cc5b0221af5d9c819557ba1508341f..ea5f0fd6f42635b605803148c707ea67b78ee9fb 100644 (file)
@@ -84,6 +84,12 @@ int publish(const req_state* s,
             continue;
         }
         event_should_be_handled = true;
+        record.configurationId = topic_filter.s3_id;
+        ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << 
+            "' on topic: '" << topic_cfg.dest.arn_topic << 
+            "' and bucket: '" << s->bucket.name << 
+            "' (unique topic: '" << topic_cfg.name <<
+            "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
         try {
             // TODO add endpoint LRU cache
             const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint, 
index a95d3483805371e4e88756a809e60959f8bc2d44..10c77c2818455d3bdf82d2d8700533e9afdfb3a2 100644 (file)
@@ -16,6 +16,8 @@ namespace rgw::notify {
         return "s3:ObjectCreated:Post";
       case ObjectCreatedCopy:
         return "s3:ObjectCreated:Copy";
+      case ObjectCreatedCompleteMultipartUpload:
+        return "s3:ObjectCreated:CompleteMultipartUpload";
       case ObjectRemoved:
         return "s3:ObjectRemoved:*";
       case ObjectRemovedDelete:
@@ -34,12 +36,13 @@ namespace rgw::notify {
       case ObjectCreatedPut:
       case ObjectCreatedPost:
       case ObjectCreatedCopy:
+      case ObjectCreatedCompleteMultipartUpload:
         return "OBJECT_CREATE";
-      case ObjectRemoved:
       case ObjectRemovedDelete:
         return "OBJECT_DELETE";
       case ObjectRemovedDeleteMarkerCreated:
         return "DELETE_MARKER_CREATE";
+      case ObjectRemoved:
       case UnknownEvent:
         return "UNKNOWN_EVENT";
     }
@@ -55,9 +58,11 @@ namespace rgw::notify {
         return ObjectCreatedPost;
     if (s == "s3:ObjectCreated:Copy")
         return ObjectCreatedCopy;
-    if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE")
+    if (s == "s3:ObjectCreated:CompleteMultipartUpload")
+        return ObjectCreatedCompleteMultipartUpload;
+    if (s == "s3:ObjectRemoved:*")
         return ObjectRemoved;
-    if (s == "s3:ObjectRemoved:Delete")
+    if (s == "s3:ObjectRemoved:Delete" || s == "OBJECT_DELETE")
         return ObjectRemovedDelete;
     if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
         return ObjectRemovedDeleteMarkerCreated;
index 2ac2d1f8c1f9d7223405e382c39bccc214e23ac5..0d86bf3f3219f3c81fe37ab742c59bd1dd55bbf8 100644 (file)
@@ -7,14 +7,15 @@
 
 namespace rgw::notify {
   enum EventType {
-    ObjectCreated       = 0xF,
-    ObjectCreatedPut    = 0x1,
-    ObjectCreatedPost   = 0x2,
-    ObjectCreatedCopy   = 0x4,
-    ObjectRemoved       = 0xF0,
-    ObjectRemovedDelete = 0x10,
-    ObjectRemovedDeleteMarkerCreated = 0x20,
-    UnknownEvent = 0x100
+    ObjectCreated                        = 0xF,
+    ObjectCreatedPut                     = 0x1,
+    ObjectCreatedPost                    = 0x2,
+    ObjectCreatedCopy                    = 0x4,
+    ObjectCreatedCompleteMultipartUpload = 0x8,
+    ObjectRemoved                        = 0xF0,
+    ObjectRemovedDelete                  = 0x10,
+    ObjectRemovedDeleteMarkerCreated     = 0x20,
+    UnknownEvent                         = 0x100
   };
 
   using EventTypeList = std::vector<EventType>;
index 238b8ba571f6dca58ea1d7b9061f04d2c062e9a6..cc61ebe81041c18f9f5348816b203fb2a8703482 100644 (file)
@@ -4278,8 +4278,8 @@ void RGWPostObj::execute()
       return;
     }
   } while (is_next_file_to_upload());
-  // send request to notification manager
-  const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store);
+
+  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4705,9 +4705,10 @@ void RGWDeleteObj::execute()
   bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
 
   if (!s->object.empty()) {
+    op_ret = get_obj_attrs(store, s, obj, attrs);
+
     if (need_object_expiration() || multipart_delete) {
       /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
         return;
       }
@@ -4715,7 +4716,6 @@ void RGWDeleteObj::execute()
 
     if (check_obj_lock) {
       /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
         if (op_ret == -ENOENT) {
           /* object maybe delete_marker, skip check_obj_lock*/
@@ -4726,6 +4726,9 @@ void RGWDeleteObj::execute()
       }
     }
 
+    // ignore return value from get_obj_attrs in all other cases
+    op_ret = 0;
+
     if (check_obj_lock) {
       auto aiter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
       if (aiter != attrs.end()) {
@@ -4828,9 +4831,10 @@ void RGWDeleteObj::execute()
   } else {
     op_ret = -EINVAL;
   }
-  // TODO: add etag calculation
-  std::string etag;
-  const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store);
+
+  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+          delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+          store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -5142,7 +5146,6 @@ void RGWCopyObj::execute()
                            this,
                            s->yield);
   
-  // TODO: use s3:ObjectCreated:Copy
   const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
@@ -5820,6 +5823,14 @@ void RGWInitMultipart::execute()
 
     op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield);
   } while (op_ret == -EEXIST);
+  
+  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 int RGWCompleteMultipart::verify_permission()
@@ -6126,6 +6137,14 @@ void RGWCompleteMultipart::execute()
   } else {
     ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
+  
+  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 int RGWCompleteMultipart::MPSerializer::try_lock(
index dd825e0570b3599d7a7cc09f74795b38fd0c5cff..44292a3ce607b491902ba479218d5809e655e3b4 100644 (file)
@@ -160,6 +160,7 @@ private:
   CephContext* const cct;
   const std::string endpoint;
   const std::string topic;
+  const std::string exchange;
   amqp::connection_ptr_t conn;
   ack_level_t ack_level;
   std::string str_ack_level;
@@ -271,8 +272,9 @@ public:
       CephContext* _cct) : 
         cct(_cct),
         endpoint(_endpoint), 
-        topic(_topic), 
-        conn(amqp::connect(endpoint, get_exchange(args))) {
+        topic(_topic),
+        exchange(get_exchange(args)),
+        conn(amqp::connect(endpoint, exchange)) {
     if (!conn) { 
       throw configuration_error("AMQP: failed to create connection to: " + endpoint);
     }
@@ -393,6 +395,7 @@ public:
     std::string str("AMQP(0.9.1) Endpoint");
     str += "\nURI: " + endpoint;
     str += "\nTopic: " + topic;
+    str += "\nExchange: " + exchange;
     str += "\nAck Level: " + str_ack_level;
     return str;
   }
index 4a2e365fd2d742f05db397707d681b95742728f1..e4bf0bf326f997bc656883ef6756985cde22374a 100644 (file)
@@ -509,7 +509,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
       ups->remove_topic(unique_topic_name);
       return;
     }
-    ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl;
+    ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
   
     if (!push_only) {
       // generate the subscription with destination information from the original topic
index b1f250313814d0fc448f73d26ee9b2159976cf27..b198bb33b19f8e27daccfe5b274cabcb429bb14c 100644 (file)
@@ -346,16 +346,14 @@ private:
     }
 
     std::string events_str = s->info.args.get("events", &exists);
-    if (exists) {
-        rgw::notify::from_string_list(events_str, events);
-        if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
-            ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
-            return -EINVAL;
-        }
-    } else {
-        // if no events are provided, we assume all events
-        events.push_back(rgw::notify::ObjectCreated);
-        events.push_back(rgw::notify::ObjectRemoved);
+    if (!exists) {
+      // if no events are provided, we notify on all of them
+      events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
+    }
+    rgw::notify::from_string_list(events_str, events);
+    if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+      ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+      return -EINVAL;
     }
     return notif_bucket_path(s->object.name, bucket_name);
   }
index 786a1c6617e76f40c686d3a17a7f678438dc166a..a3b7807582e45302e00ac05e0135365d52828a4d 100644 (file)
@@ -20,7 +20,7 @@ from .tests import get_realm, \
     gen_bucket_name, \
     get_user, \
     get_tenant
-from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
@@ -152,22 +152,8 @@ class AMQPReceiver(object):
     def __init__(self, exchange, topic):
         import pika
         hostname = get_ip()
-        retries = 20
-        connect_ok = False
-        while retries > 0:
-            try:
-                connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
-                connect_ok = True
-                break
-            except Exception as error:
-                retries -= 1
-                print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error))
-                log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error))
-                time.sleep(2)
-
-        if connect_ok == False:
-            raise error
-            
+        connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+
         self.channel = connection.channel()
         self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
         result = self.channel.queue_declare('', exclusive=True)
@@ -177,10 +163,11 @@ class AMQPReceiver(object):
                                    on_message_callback=self.on_message,
                                    auto_ack=True)
         self.events = []
+        self.topic = topic
 
     def on_message(self, ch, method, properties, body):
         """callback invoked when a new message arrive on the topic"""
-        log.info('AMQP received event: %s', body)
+        log.info('AMQP received event for topic %s:\n %s', self.topic, body)
         self.events.append(json.loads(body))
 
     # TODO create a base class for the AMQP and HTTP cases
@@ -194,6 +181,11 @@ class AMQPReceiver(object):
         verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
         self.events = []
 
+    def get_and_reset_events(self):
+        tmp = self.events
+        self.events = []
+        return tmp
+
 
 def amqp_receiver_thread_runner(receiver):
     """main thread function for the amqp receiver"""
@@ -613,6 +605,9 @@ def test_ps_s3_topic_on_master():
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
+
+    # clean all topics
+    delete_all_s3_topics(zones[0].conn, zonegroup.name)
    
     # create s3 topics
     endpoint_address = 'amqp://127.0.0.1:7001'
@@ -635,44 +630,36 @@ def test_ps_s3_topic_on_master():
     assert_equal(topic_arn,
                  'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
 
-    try:
-        # get topic 3
-        result, status = topic_conf3.get_config()
-        assert_equal(status, 200)
-        assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-        assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
-        # Note that endpoint args may be ordered differently in the result
-
-        # delete topic 1
-        result = topic_conf1.del_config()
-        assert_equal(status, 200)
-
-        # try to get a deleted topic (1)
-        _, status = topic_conf1.get_config()
-        assert_equal(status, 404)
-
-        # get the remaining 2 topics
-        result = topic_conf1.get_list()
-        assert_equal(len(result['Topics']), 2)
-        
-        # delete topics
-        result = topic_conf2.del_config()
-        # TODO: should be 200OK
-        # assert_equal(status, 200)
-        result = topic_conf3.del_config()
-        # TODO: should be 200OK
-        # assert_equal(status, 200)
-
-        # get topic list, make sure it is empty
-        result = topic_conf1.get_list()
-        assert_equal(len(result['Topics']), 0)
-    except AssertionError as e:
-        # topics are stored at user level, so cleanup is needed
-        # to prevent failures in consequent runs
-        topic_conf1.del_config()
-        topic_conf2.del_config()
-        topic_conf3.del_config()
-        raise e
+    # get topic 3
+    result, status = topic_conf3.get_config()
+    assert_equal(status, 200)
+    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+    # Note that endpoint args may be ordered differently in the result
+
+    # delete topic 1
+    result = topic_conf1.del_config()
+    assert_equal(status, 200)
+
+    # try to get a deleted topic
+    _, status = topic_conf1.get_config()
+    assert_equal(status, 404)
+
+    # get the remaining 2 topics
+    result = topic_conf1.get_list()
+    assert_equal(len(result['Topics']), 2)
+    
+    # delete topics
+    result = topic_conf2.del_config()
+    # TODO: should be 200OK
+    # assert_equal(status, 200)
+    result = topic_conf3.del_config()
+    # TODO: should be 200OK
+    # assert_equal(status, 200)
+
+    # get topic list, make sure it is empty
+    result = topic_conf1.get_list()
+    assert_equal(len(result['Topics']), 0)
 
 
 def test_ps_s3_notification_on_master():
@@ -779,8 +766,9 @@ def test_ps_s3_notification_errors_on_master():
       assert False, 'missing notification name is expected to fail'
 
     # create s3 notification with invalid topic ARN
+    invalid_topic_arn = 'kaboom'
     topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': 'kaboom',
+                        'TopicArn': invalid_topic_arn,
                         'Events': ['s3:ObjectCreated:Put']
                        }]
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
@@ -791,6 +779,20 @@ def test_ps_s3_notification_errors_on_master():
     else:
       assert False, 'invalid ARN is expected to fail'
 
+    # create s3 notification with unknown topic ARN
+    invalid_topic_arn = 'arn:aws:sns:a::kaboom'
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': invalid_topic_arn ,
+                        'Events': ['s3:ObjectCreated:Put']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    try:
+      _, _ = s3_notification_conf.set_config()
+    except Exception as error:
+      print str(error) + ' - is expected'
+    else:
+      assert False, 'unknown topic is expected to fail'
+
     # create s3 notification with wrong bucket
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
@@ -804,8 +806,16 @@ def test_ps_s3_notification_errors_on_master():
     else:
       assert False, 'unknown bucket is expected to fail'
 
-    # cleanup
     topic_conf.del_config()
+
+    status = topic_conf.del_config()
+    # deleting an unknown notification is not considered an error
+    assert_equal(status, 200)
+    
+    _, status = topic_conf.get_config()
+    assert_equal(status, 404)
+    
+    # cleanup
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
 
@@ -905,7 +915,7 @@ def test_ps_s3_notification_push_amqp_on_master():
     start_time = time.time()
     for i in range(number_of_objects):
         key = bucket.new_key(str(i))
-        content = 'bar'
+        content = str(os.urandom(1024*1024))
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
@@ -1596,7 +1606,7 @@ def test_ps_s3_creation_triggers_on_master():
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post']
+                        'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
                        }]
 
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
@@ -1636,55 +1646,178 @@ def test_ps_s3_creation_triggers_on_master():
     clean_rabbitmq(proc)
 
 
+def test_ps_s3_multipart_on_master():
+    """ test multipart object upload on master"""
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receivers
+    exchange = 'ex1'
+    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
+    task1.start()
+    task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
+    task2.start()
+    task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
+    task3.start()
+
+    # create s3 topics
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
+    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn2 = topic_conf2.set_config()
+    topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn3 = topic_conf3.set_config()
+
+    # create s3 notifications
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+                        'Events': ['s3:ObjectCreated:*']
+                       },
+                       {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+                        'Events': ['s3:ObjectCreated:Post']
+                       },
+                       {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
+                        'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket using multi-part upload
+    fp = tempfile.TemporaryFile(mode='w+b')
+    content = bytearray(os.urandom(1024*1024))
+    fp.write(content)
+    fp.flush()
+    fp.seek(0)
+    uploader = bucket.initiate_multipart_upload('multipart')
+    uploader.upload_part_from_file(fp, 1)
+    uploader.complete_upload()
+    fp.close()
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+
+    # check amqp receiver
+    events = receiver1.get_and_reset_events()
+    assert_equal(len(events), 3)
+
+    events = receiver2.get_and_reset_events()
+    assert_equal(len(events), 1)
+    assert_equal(events[0]['eventName'], 's3:ObjectCreated:Post')
+    assert_equal(events[0]['s3']['configurationId'], notification_name+'_2')
+
+    events = receiver3.get_and_reset_events()
+    assert_equal(len(events), 1)
+    assert_equal(events[0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
+    assert_equal(events[0]['s3']['configurationId'], notification_name+'_3')
+
+    # cleanup
+    stop_amqp_receiver(receiver1, task1)
+    stop_amqp_receiver(receiver2, task2)
+    stop_amqp_receiver(receiver3, task3)
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    topic_conf3.del_config()
+    for key in bucket.list():
+        key.delete()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
 def test_ps_versioned_deletion():
     """ test notification of deletion markers """
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
-    # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
-    topic_conf.set_config()
+    # create topics
+    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
+    _, status = topic_conf1.set_config()
+    assert_equal(status/100, 2)
+    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
+    _, status = topic_conf2.set_config()
+    assert_equal(status/100, 2)
+    
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
     bucket.configure_versioning(True)
+    
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
+    
     # create notifications
-    # TODO use 'DELETE_MARKER_CREATE'
-    event_type = 'OBJECT_DELETE'
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
-                                       topic_name, event_type)
-    _, status = notification_conf.set_config()
+    event_type1 = 'OBJECT_DELETE'
+    notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
+                                        topic_name+'_1',
+                                        event_type1)
+    _, status = notification_conf1.set_config()
+    assert_equal(status/100, 2)
+    event_type2 = 'DELETE_MARKER_CREATE'
+    notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
+                                        topic_name+'_2',
+                                        event_type2)
+    _, status = notification_conf2.set_config()
     assert_equal(status/100, 2)
-    # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
-                              topic_name)
-    _, status = sub_conf.set_config()
+    
+    # create subscriptions
+    sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
+                               topic_name+'_1')
+    _, status = sub_conf1.set_config()
+    assert_equal(status/100, 2)
+    sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
+                               topic_name+'_2')
+    _, status = sub_conf2.set_config()
     assert_equal(status/100, 2)
+    
     # create objects in the bucket
     key = bucket.new_key('foo')
     key.set_contents_from_string('bar')
     v1 = key.version_id
     key.set_contents_from_string('kaboom')
     v2 = key.version_id
+    # create deletion marker
+    delete_marker_key = bucket.delete_key(key.name)
+    
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # set delete markers
+
+    # delete the deletion marker
+    delete_marker_key.delete()
+    # delete versions
     bucket.delete_key(key.name, version_id=v2)
     bucket.delete_key(key.name, version_id=v1)
+    
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
 
     # get the delete events from the subscription
-    result, _ = sub_conf.get_events()
+    result, _ = sub_conf1.get_events()
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
         log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
-        assert_equal(str(event['event']), event_type)
+        assert_equal(str(event['event']), event_type1)
 
-    # TODO: verify we have exactly 2 events
-    assert len(parsed_result['events']) >= 2
+    result, _ = sub_conf2.get_events()
+    parsed_result = json.loads(result)
+    for event in parsed_result['events']:
+        log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+        assert_equal(str(event['event']), event_type2)
 
     # cleanup
     # follwing is needed for the cleanup in the case of 3-zones
@@ -1697,9 +1830,12 @@ def test_ps_versioned_deletion():
         zones[0].delete_bucket(bucket_name)
     except:
         log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
-    sub_conf.del_config()
-    notification_conf.del_config()
-    topic_conf.del_config()
+    sub_conf1.del_config()
+    sub_conf2.del_config()
+    notification_conf1.del_config()
+    notification_conf2.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
 
 
 def test_ps_s3_metadata_on_master():
@@ -1791,8 +1927,14 @@ def test_ps_s3_versioned_deletion_on_master():
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
-    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put']
+    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:*']
+                       },
+                       {'Id': notification_name+'_2', 'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
+                       },
+                       {'Id': notification_name+'_3', 'TopicArn': topic_arn,
+                         'Events': ['s3:ObjectRemoved:Delete']
                        }]
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
@@ -1804,24 +1946,37 @@ def test_ps_s3_versioned_deletion_on_master():
     v1 = key.version_id
     key.set_contents_from_string('kaboom')
     v2 = key.version_id
-    keys = list(bucket.list())
+    # create delete marker (non versioned deletion)
+    delete_marker_key = bucket.delete_key(key.name)
     
-    print 'wait for 5sec for the messages...'
-    time.sleep(5)
+    time.sleep(1)
     
-    # check amqp receiver
-    # Note: should not do exact match in case of versioned objects
-    receiver.verify_s3_events(keys, exact_match=False)
-    # set delete markers
+    # versioned deletion
     bucket.delete_key(key.name, version_id=v2)
     bucket.delete_key(key.name, version_id=v1)
+    delete_marker_key.delete()
 
     print 'wait for 5sec for the messages...'
     time.sleep(5)
 
     # check amqp receiver
-    # Note: should not do exact match in case of versioned objects
-    receiver.verify_s3_events(keys, exact_match=False, deletions=True)
+    events = receiver.get_and_reset_events()
+    delete_events = 0
+    delete_marker_create_events = 0
+    for event in events:
+        if event['eventName'] == 's3:ObjectRemoved:Delete':
+            delete_events += 1
+            assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
+        if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
+            delete_marker_create_events += 1
+            assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
+   
+    # 3 key versions were deleted (v1, v2 and the deletion marker)
+    # notified over the same topic via 2 notifications (1,3)
+    assert_equal(delete_events, 3*2)
+    # 1 deletion marker was created
+    # notified over the same topic over 2 notifications (1,2)
+    assert_equal(delete_marker_create_events, 1*2)
 
     # cleanup
     stop_amqp_receiver(receiver, task)
index d2aa5d346cebb2cf3ef90d8e17817ea7d9763a8b..8ee49f2036b164d1bda7b7a5c968908a84d5db2a 100644 (file)
@@ -130,6 +130,23 @@ class PSTopic:
         return self.send_request('GET', get_list=True)
 
 
+def delete_all_s3_topics(conn, region):
+    try:
+        client = boto3.client('sns',
+                              endpoint_url='http://'+conn.host+':'+str(conn.port),
+                              aws_access_key_id=conn.aws_access_key_id,
+                              aws_secret_access_key=conn.aws_secret_access_key,
+                              region_name=region,
+                              config=Config(signature_version='s3'))
+
+        topics = client.list_topics()['Topics']
+        for topic in topics:
+            print 'topic cleanup, deleting: ' + topic['TopicArn']
+            assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
+    except:
+        print 'failed to do topic cleanup. if there are topics they may need to be manually deleted'
+    
+
 class PSTopicS3:
     """class to set/list/get/delete a topic
     POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]