]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: fix doc on updates. attempt to fix multi-notifications
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 2 May 2019 17:33:34 +0000 (20:33 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Thu, 2 May 2019 17:33:34 +0000 (20:33 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
doc/radosgw/pubsub-module.rst
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub_rest.cc
src/rgw/rgw_tools.cc
src/rgw/rgw_tools.h
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py
src/test/rgw/test_multi.py

index 5185ef06edf63a1d1e3768f3295f4be3417e63dc..0c65f1ca81693a844814e6db6d6bce852ff6372d 100644 (file)
@@ -136,6 +136,9 @@ This will create a new topic. Topic creation is needed both for both flavors of
 Optionally the topic could be provided with push endpoint parameters that would be used later
 when an S3-compatible notification is created.
 Upon successful request, the response will include the topic ARN that could be later used to reference this topic in an S3-compatible notification request. 
+To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.
+
+.. tip:: Any S3-compatible notification already associated with the topic needs to be re-created for the topic update to take effect 
 
 ::
 
index d797efa9e8277304416307db61dd1fdd20d76c70..1bbfbf594e9eadfd0f8a86621be3f2c583ae3f0e 100644 (file)
@@ -247,18 +247,22 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E
 
   int ret = ps->get_topic(topic_name, &user_topic_info);
   if (ret < 0) {
-    ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+    ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
     return ret;
   }
+  ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;
 
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_bucket_topics bucket_topics;
 
   ret = read_topics(&bucket_topics, &objv_tracker);
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+  if (ret < 0) {
+    ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" << 
+      bucket.name << "': ret=" << ret << dendl;
     return ret;
   }
+  ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << 
+    bucket.name << "'" << dendl;
 
   auto& topic_filter = bucket_topics.topics[topic_name];
   topic_filter.topic = user_topic_info.topic;
@@ -266,9 +270,11 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E
 
   ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
     return ret;
   }
+    
+  ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
 
   return 0;
 }
@@ -288,7 +294,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
   rgw_pubsub_bucket_topics bucket_topics;
 
   ret = read_topics(&bucket_topics, &objv_tracker);
-  if (ret < 0 && ret != -ENOENT) {
+  if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
     return ret;
   }
index 4dbaf5023a5dc8f428c1d01c1221d22698609bfe..b63c58bb13b194a9fb78e904dea8e74cef6fbb76 100644 (file)
@@ -457,15 +457,31 @@ public:
     rgw_bucket bucket;
     rgw_raw_obj bucket_meta_obj;
 
+    // read the list of topics associated with a bucket and populate into result
+    // use version tacker to enforce atomicity between read/write
+    // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+    // set the list of topics associated with a bucket
+    // use version tacker to enforce atomicity between read/write
+    // return 0 on success, error code otherwise
     int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
   public:
     Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
       ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
     }
 
+    // read the list of topics associated with a bucket and populate into result
+    // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int get_topics(rgw_pubsub_bucket_topics *result);
+    // adds a topic + filter (event list) to a bucket
+    // if the topic already exist on the bucket, the filter event list may be updated
+    // return -ENOENT if the topic does not exists
+    // return 0 on success, error code otherwise
     int create_notification(const string& topic_name, const EventTypeList& events);
+    // remove a topic and filter from bucket
+    // if the topic does not exists on the bucket it is a no-op (considered success)
+    // return -ENOENT if the topic does not exists
+    // return 0 on success, error code otherwise
     int remove_notification(const string& topic_name);
   };
 
@@ -554,10 +570,24 @@ public:
     *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
   }
 
+  // get all topics defined for the user and populate them into "result"
+  // return 0 on success or if no topics exist, error code otherwise
   int get_user_topics(rgw_pubsub_user_topics *result);
+  // get a topic by its name and populate it into "result"
+  // return -ENOENT if the topic does not exists 
+  // return 0 on success, error code otherwise
   int get_topic(const string& name, rgw_pubsub_topic_subs *result);
+  // create a topic with a name only
+  // if the topic already exists it is a no-op (considered success)
+  // return 0 on success, error code otherwise
   int create_topic(const string& name);
+  // create a topic with push destination information and ARN
+  // if the topic already exists the destination and ARN values may be updated (considered succsess)
+  // return 0 on success, error code otherwise
   int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
+  // remove a topic according to its name
+  // if the topic does not exists it is a no-op (considered success)
+  // return 0 on success, error code otherwise
   int remove_topic(const string& name);
 };
 
@@ -591,7 +621,7 @@ int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTra
   bufferlist bl;
   encode(info, bl);
 
-  int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
+  int ret = rgw_put_system_obj(store, obj_ctx, obj.pool, obj.oid,
                            bl, false, objv_tracker,
                            real_time());
   if (ret < 0) {
index f80c05e0b1c423eade23c70d838c09942c8d6eaa..3419e358170f57fb6e0a664d9b1beb41b65f0bbb 100644 (file)
@@ -48,9 +48,10 @@ void RGWPSCreateTopicOp::execute()
   ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->create_topic(topic_name, dest, topic_arn);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
 }
 
 // command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
@@ -121,6 +122,7 @@ void RGWPSListTopicsOp::execute()
     ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully got topics" << dendl;
 }
 
 // command: GET /topics
@@ -177,6 +179,7 @@ void RGWPSGetTopicOp::execute()
     ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
 }
 
 // command: GET /topics/<topic-name>
@@ -235,9 +238,10 @@ void RGWPSDeleteTopicOp::execute()
   ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
   op_ret = ups->remove_topic(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
 }
 
 // command: DELETE /topics/<topic-name>
@@ -324,9 +328,10 @@ void RGWPSCreateSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->subscribe(topic_name, dest);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
 }
 
 // command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
@@ -387,9 +392,10 @@ void RGWPSGetSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->get_conf(&result);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
 }
 
 // command: GET /subscriptions/<sub-name>
@@ -448,9 +454,10 @@ void RGWPSDeleteSubOp::execute()
   auto sub = ups->get_sub(sub_name);
   op_ret = sub->unsubscribe(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
 }
 
 // command: DELETE /subscriptions/<sub-name>
@@ -498,9 +505,10 @@ void RGWPSAckSubEventOp::execute()
   auto sub = ups->get_sub_with_events(sub_name);
   op_ret = sub->remove_event(event_id);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
 }
 
 // command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
@@ -561,14 +569,15 @@ void RGWPSPullSubEventsOp::execute()
   sub = ups->get_sub_with_events(sub_name);
   if (!sub) {
     op_ret = -ENOENT;
-    ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
     return;
   }
   op_ret = sub->list_events(marker, max_entries);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to get subscription events, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
 }
 
 // command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
@@ -746,9 +755,10 @@ void RGWPSCreateNotif_ObjStore_Ceph::execute()
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->create_notification(topic_name, events);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
 }
 
 namespace {
@@ -923,7 +933,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + unique_topic_name;
     dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
     auto sub = ups->get_sub(sub_name);
-    op_ret = sub->subscribe(unique_topic_name, dest, c.id);
+    op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
       // rollback generated notification (ignore return value)
@@ -932,6 +942,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
       ups->remove_topic(unique_topic_name);
       return;
     }
+    ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
   }
 }
 
@@ -1001,9 +1012,10 @@ void RGWPSDeleteNotif_ObjStore_Ceph::execute() {
   auto b = ups->get_bucket(bucket_info.bucket);
   op_ret = b->remove_notification(topic_name);
   if (op_ret < 0) {
-    ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl;
+    ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
   }
+  ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
 }
 
 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
index 1fb18aee8abacf5c97bec51118b0f731fd35c1ef..dcec9c43aa68328b8c256c4f477d8babbaa3e2f5 100644 (file)
@@ -129,6 +129,39 @@ int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& o
   return ret;
 }
 
+int rgw_put_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+                       RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
+{
+  map<string,bufferlist> no_attrs;
+  if (!pattrs) {
+    pattrs = &no_attrs;
+  }
+
+  rgw_raw_obj obj(pool, oid);
+
+  auto sysobj = obj_ctx.get_obj(obj);
+  int ret = sysobj.wop()
+                  .set_objv_tracker(objv_tracker)
+                  .set_exclusive(exclusive)
+                  .set_mtime(set_mtime)
+                  .set_attrs(*pattrs)
+                  .write(data, null_yield);
+
+  if (ret == -ENOENT) {
+    ret = rgwstore->create_pool(pool);
+    if (ret >= 0) {
+      ret = sysobj.wop()
+                  .set_objv_tracker(objv_tracker)
+                  .set_exclusive(exclusive)
+                  .set_mtime(set_mtime)
+                  .set_attrs(*pattrs)
+                  .write(data, null_yield);
+    }
+  }
+
+  return ret;
+}
+
 int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
                        RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs,
                        rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version)
index 927915e9ef0cd5c02ffc2590080b13960b2131e2..30c9581d1bf29f4f3ec513bc916228ace9fece62 100644 (file)
@@ -24,6 +24,8 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
 
 int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
                        RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs = NULL);
+int rgw_put_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+                       RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs = NULL);
 int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
                        RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs = NULL,
                        rgw_cache_entry_info *cache_info = NULL,
index 8846f5a8008807a07382c86b5e3a207589fd5dc9..3d7c16e5b2c535a804c45b20a3a811b55b74f8c5 100644 (file)
@@ -172,9 +172,12 @@ def test_ps_s3_notification_low_level():
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     generated_topic_name = notification_name+'_'+topic_name
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
-    response, status = s3_notification_conf.set_config()
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     zone_meta_checkpoint(ps_zones[0].zone)
     # get auto-generated topic
@@ -198,7 +201,7 @@ def test_ps_s3_notification_low_level():
     assert_equal(status/100, 2)
     assert_equal(parsed_result['topic'], generated_topic_name)
     # delete s3 notification
-    _, status = s3_notification_conf.del_config(all_notifications=False)
+    _, status = s3_notification_conf.del_config(notification=notification_name)
     assert_equal(status/100, 2)
     # delete topic
     _, status = topic_conf.del_config()
@@ -241,9 +244,12 @@ def test_ps_s3_notification_records():
     topic_arn = parsed_result['arn']
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, 
-                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
-    response, status = s3_notification_conf.set_config()
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     zone_meta_checkpoint(ps_zones[0].zone)
     # get auto-generated subscription
@@ -295,15 +301,20 @@ def test_ps_s3_notification():
     topic_arn = parsed_result['arn']
     # create one s3 notification
     notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
-    s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                             notification_name1, topic_arn, ['s3:ObjectCreated:*'])
+    topic_conf_list = [{'Id': notification_name1,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf1.set_config()
     assert_equal(status/100, 2)
-    # create another s3 notification
+    # create another s3 notification with the same topic
     notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
-
-    s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                             notification_name2, topic_arn, ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'])
+    topic_conf_list = [{'Id': notification_name2,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+                        }]
+    s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf2.set_config()
     assert_equal(status/100, 2)
     zone_meta_checkpoint(ps_zones[0].zone)
@@ -316,19 +327,19 @@ def test_ps_s3_notification():
     assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
 
     # get specific notification on a bucket
-    response, status = s3_notification_conf1.get_config(all_notifications=False)
+    response, status = s3_notification_conf1.get_config(notification=notification_name1)
     assert_equal(status/100, 2)
     assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
     assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1)
-    response, status = s3_notification_conf2.get_config(all_notifications=False)
+    response, status = s3_notification_conf2.get_config(notification=notification_name2)
     assert_equal(status/100, 2)
     assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
     assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2)
 
     # delete specific notifications
-    _, status = s3_notification_conf1.del_config(all_notifications=False)
+    _, status = s3_notification_conf1.del_config(notification=notification_name1)
     assert_equal(status/100, 2)
-    _, status = s3_notification_conf2.del_config(all_notifications=False)
+    _, status = s3_notification_conf2.del_config(notification=notification_name2)
     assert_equal(status/100, 2)
 
     # cleanup
@@ -361,7 +372,8 @@ def test_ps_topic():
     _, status = topic_conf.del_config()
     assert_equal(status/100, 2)
     # verift topic is deleted
-    result, _ = topic_conf.get_config()
+    result, status = topic_conf.get_config()
+    assert_equal(status, 404)
     parsed_result = json.loads(result)
     assert_equal(parsed_result['Code'], 'NoSuchKey')
 
@@ -976,8 +988,11 @@ def test_ps_s3_push_http():
     zone_meta_checkpoint(ps_zones[0].zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -1054,7 +1069,7 @@ def test_ps_push_amqp():
 
 
 def test_ps_s3_push_amqp():
-    """ test pushing to amqp endpoint s3 record format"""
+    """ test pushing to amqp endpoint s3 record format"""
     return SkipTest("PubSub push tests are only manual")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
@@ -1066,15 +1081,19 @@ def test_ps_s3_push_amqp():
                          endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
-    topic_arn = 'arn:aws:sns:::' + topic_name
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -1118,8 +1137,11 @@ def test_ps_delete_bucket():
     topic_arn = parsed_result['arn']
     # create one s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                             notification_name, topic_arn, ['s3:ObjectCreated:*'])
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
    
@@ -1156,7 +1178,7 @@ def test_ps_delete_bucket():
     verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
 
     # s3 notification is deleted with bucket
-    _, status = s3_notification_conf.get_config(all_notifications=False)
+    _, status = s3_notification_conf.get_config(notification=notification_name)
     assert_equal(status, 404)
     # non-s3 notification is deleted with bucket
     _, status = notification_conf.get_config()
@@ -1179,8 +1201,11 @@ def test_ps_missing_topic():
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_arn = 'arn:aws:sns:::' + topic_name
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
-                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     try:
         s3_notification_conf.set_config()
     except:
@@ -1190,3 +1215,246 @@ def test_ps_missing_topic():
 
     # cleanup
     zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_s3_topic_update():
+    """ test updating topic associated with a notification"""
+    return SkipTest("PubSub push tests are only manual")
+    zones, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create topic
+    dest_endpoint1 = 'amqp://localhost'
+    dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
+    dest_endpoint2 = 'http://localhost:9001'
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
+                         endpoint=dest_endpoint1,
+                         endpoint_args=dest_args1)
+    result, status = topic_conf.set_config()
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
+    assert_equal(status/100, 2)
+    # get topic
+    result, _ = topic_conf.get_config()
+    # verify topic content
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+    assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint1)
+
+    # create bucket on the first of the rados zones
+    bucket = zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # TODO: check update to amqp
+
+    # update the same topic
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
+                         endpoint=dest_endpoint2)
+    _, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    # get topic
+    result, _ = topic_conf.get_config()
+    # verify topic content
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+    assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint2)
+
+    # create more objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i+100))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # TODO: check it is still updating amqp
+
+    # update notification to update the endpoint from the topic
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create even more objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i+200))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # TODO: check that updates switched to http
+
+    # cleanup
+    # delete objects from the bucket
+    for key in bucket.list():
+        key.delete()
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_s3_notification_update():
+    """ test updating the topic of a notification"""
+    return SkipTest("PubSub push tests are only manual")
+    zones, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+
+    # create first topic
+    dest_endpoint1 = 'amqp://localhost'
+    dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
+    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, 
+                         endpoint=dest_endpoint1,
+                         endpoint_args=dest_args1)
+    result, status = topic_conf1.set_config()
+    parsed_result = json.loads(result)
+    topic_arn1 = parsed_result['arn']
+    assert_equal(status/100, 2)
+
+    # create bucket on the first of the rados zones
+    bucket = zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn1,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    result, _ = s3_notification_conf.get_config()
+
+    # TODO: check updates to amqp
+
+    # create another topic
+    topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
+    dest_endpoint2 = 'http://localhost:9001'
+    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, 
+                         endpoint=dest_endpoint2)
+    result, status = topic_conf2.set_config()
+    parsed_result = json.loads(result)
+    topic_arn2 = parsed_result['arn']
+    assert_equal(status/100, 2)
+
+    # update notification to the new topic
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn2,
+                        'Events': ['s3:ObjectCreated:*']
+                        }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create more objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i+200))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # TODO: check uodate to http
+    result, _ = s3_notification_conf.get_config()
+
+    # cleanup
+    # delete objects from the bucket
+    for key in bucket.list():
+        key.delete()
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_s3_multiple_topics_notification():
+    """ test notification creation with multiple topics"""
+    zones, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+    topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
+
+    # create topics
+    dest_endpoint1 = 'amqp://localhost'
+    dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
+    dest_endpoint2 = 'http://localhost:9001'
+    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, 
+                         endpoint=dest_endpoint1,
+                         endpoint_args=dest_args1)
+    result, status = topic_conf1.set_config()
+    parsed_result = json.loads(result)
+    topic_arn1 = parsed_result['arn']
+    assert_equal(status/100, 2)
+    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, 
+                         endpoint=dest_endpoint2)
+    result, status = topic_conf2.set_config()
+    parsed_result = json.loads(result)
+    topic_arn2 = parsed_result['arn']
+    assert_equal(status/100, 2)
+
+    # create bucket on the first of the rados zones
+    bucket = zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [
+            {
+                'Id': notification_name + '_1',
+                'TopicArn': topic_arn1,
+                'Events': ['s3:ObjectCreated:*']
+            },
+            {
+                'Id': notification_name + '_2',
+                'TopicArn': topic_arn2,
+                'Events': ['s3:ObjectCreated:*']
+            }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    result, _ = s3_notification_conf.get_config()
+    print('first try')
+    print(result)
+    # FIXME: this is currently failing
+    #assert_equal(len(result['TopicConfigurations']), 2)
+
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+    result, _ = s3_notification_conf.get_config()
+    print('second try')
+    print(result)
+    assert_equal(len(result['TopicConfigurations']), 2)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    zones[0].delete_bucket(bucket_name)
index ca97d4c57b6c6d7f07e811ce730ef8aaa2df75aa..0073adc4e561cb820b54041350c154395201a9ed 100644 (file)
@@ -154,14 +154,12 @@ class PSNotificationS3:
     GET /<bucket>?notification[=<notification>]
     DELETE /<bucket>?notification[=<notification>]
     """
-    def __init__(self, conn, bucket_name, notification, topic_arn, events=None):
+    def __init__(self, conn, bucket_name, topic_conf_list):
         self.conn = conn
         assert bucket_name.strip()
         self.bucket_name = bucket_name
         self.resource = '/'+bucket_name
-        self.notification = notification
-        self.topic_arn = topic_arn
-        self.events = events
+        self.topic_conf_list = topic_conf_list
         self.client = boto3.client('s3',
                                    endpoint_url='http://'+conn.host+':'+str(conn.port),
                                    aws_access_key_id=conn.aws_access_key_id,
@@ -173,14 +171,14 @@ class PSNotificationS3:
         return make_request(self.conn, method, self.resource,
                             parameters=parameters, sign_parameters=True)
 
-    def get_config(self, all_notifications=True):
+    def get_config(self, notification=None):
         """get notification info"""
         parameters = None
-        if all_notifications:
+        if notification is None:
             response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
             status = response['ResponseMetadata']['HTTPStatusCode']
             return response, status
-        parameters = {'notification': self.notification}
+        parameters = {'notification': notification}
         response, status = self.send_request('GET', parameters=parameters)
         dict_response = xmltodict.parse(response)
         return dict_response, status
@@ -189,24 +187,14 @@ class PSNotificationS3:
         """set notification"""
         response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
                                                                      NotificationConfiguration={
-                                                                         'TopicConfigurations': [
-                                                                             {
-                                                                                 'Id': self.notification,
-                                                                                 'TopicArn': self.topic_arn,
-                                                                                 'Events': self.events,
-                                                                             }
-                                                                         ]
+                                                                         'TopicConfigurations': self.topic_conf_list
                                                                      })
         status = response['ResponseMetadata']['HTTPStatusCode']
         return response, status
 
-    def del_config(self, all_notifications=True):
+    def del_config(self, notification=None):
         """delete notification"""
-        parameters = None
-        if all_notifications:
-            parameters = {'notification': None}
-        else:
-            parameters = {'notification': self.notification}
+        parameters = {'notification': notification}
 
         return self.send_request('DELETE', parameters)
 
index ebfc5b505a1e5f926c24f270e3ed12b382baab26..e3863c5a9a6e0e3d52635eff833e1061987fd390 100644 (file)
@@ -70,6 +70,8 @@ class Cluster(multisite.Cluster):
             env = os.environ.copy()
             env['CEPH_NUM_MDS'] = '0'
             cmd += ['-n']
+            # cmd += ['-o']
+            # cmd += ['rgw_cache_enabled=false']
         bash(cmd, env=env)
         self.needs_reset = False