From: Yuval Lifshitz Date: Thu, 2 May 2019 17:33:34 +0000 (+0300) Subject: rgw/pubsub: fix doc on updates. attempt to fix multi-notifications X-Git-Tag: v15.1.0~2748^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3279a786ba0abc9d1955f03eaa5f94da0d12ed22;p=ceph.git rgw/pubsub: fix doc on updates. attempt to fix multi-notifications Signed-off-by: Yuval Lifshitz --- diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 5185ef06edf63..0c65f1ca81693 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -136,6 +136,9 @@ This will create a new topic. Topic creation is needed both for both flavors of Optionally the topic could be provided with push endpoint parameters that would be used later when an S3-compatible notification is created. Upon successful request, the response will include the topic ARN that could be later used to reference this topic in an S3-compatible notification request. +To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values. + +.. tip:: Any S3-compatible notification already associated with the topic needs to be re-created for the topic update to take effect :: diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index d797efa9e8277..1bbfbf594e9ea 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -247,18 +247,22 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E int ret = ps->get_topic(topic_name, &user_topic_info); if (ret < 0) { - ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; return ret; } + ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl; RGWObjVersionTracker objv_tracker; rgw_pubsub_bucket_topics bucket_topics; ret = read_topics(&bucket_topics, &objv_tracker); - if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + if (ret < 0) { + ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" << + bucket.name << "': ret=" << ret << dendl; return ret; } + ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << + bucket.name << "'" << dendl; auto& topic_filter = bucket_topics.topics[topic_name]; topic_filter.topic = user_topic_info.topic; @@ -266,9 +270,11 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; return ret; } + + ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; return 0; } @@ -288,7 +294,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) rgw_pubsub_bucket_topics bucket_topics; ret = read_topics(&bucket_topics, &objv_tracker); - if (ret < 0 && ret != -ENOENT) { + if (ret < 0) { ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 4dbaf5023a5dc..b63c58bb13b19 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -457,15 +457,31 @@ public: rgw_bucket bucket; rgw_raw_obj bucket_meta_obj; + // read the list of topics associated with a bucket and populate into result + // use version tacker to enforce atomicity between read/write + // return 0 on success or if no topic was associated with the bucket, error code otherwise int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker); + // set the list of topics associated with a bucket + // use version tacker to enforce atomicity between read/write + // return 0 on success, error code otherwise int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker); public: Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); } + // read the list of topics associated with a bucket and populate into result + // return 0 on success or if no topic was associated with the bucket, error code otherwise int get_topics(rgw_pubsub_bucket_topics *result); + // adds a topic + filter (event list) to a bucket + // if the topic already exist on the bucket, the filter event list may be updated + // return -ENOENT if the topic does not exists + // return 0 on success, error code otherwise int create_notification(const string& topic_name, const EventTypeList& events); + // remove a topic and filter from bucket + // if the topic does not exists on the bucket it is a no-op (considered success) + // return -ENOENT if the topic does not exists + // return 0 on success, error code otherwise int remove_notification(const string& topic_name); }; @@ -554,10 +570,24 @@ public: *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name)); } + // get all topics defined for the user and populate them into "result" + // return 0 on success or if no topics exist, error code otherwise int get_user_topics(rgw_pubsub_user_topics *result); + // get a topic by its name and populate it into "result" + // return -ENOENT if the topic does not exists + // return 0 on success, error code otherwise int get_topic(const string& name, rgw_pubsub_topic_subs *result); + // create a topic with a name only + // if the topic already exists it is a no-op (considered success) + // return 0 on success, error code otherwise int create_topic(const string& name); + // create a topic with push destination information and ARN + // if the topic already exists the destination and ARN values may be updated (considered succsess) + // return 0 on success, error code otherwise int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn); + // remove a topic according to its name + // if the topic does not exists it is a no-op (considered success) + // return 0 on success, error code otherwise int remove_topic(const string& name); }; @@ -591,7 +621,7 @@ int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTra bufferlist bl; encode(info, bl); - int ret = rgw_put_system_obj(store, obj.pool, obj.oid, + int ret = rgw_put_system_obj(store, obj_ctx, obj.pool, obj.oid, bl, false, objv_tracker, real_time()); if (ret < 0) { diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index f80c05e0b1c42..3419e358170f5 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -48,9 +48,10 @@ void RGWPSCreateTopicOp::execute() ups = std::make_unique(store, s->owner.get_id()); op_ret = ups->create_topic(topic_name, dest, topic_arn); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl; } // command: PUT /topics/[&push-endpoint=[&=]] @@ -121,6 +122,7 @@ void RGWPSListTopicsOp::execute() ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully got topics" << dendl; } // command: GET /topics @@ -177,6 +179,7 @@ void RGWPSGetTopicOp::execute() ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl; } // command: GET /topics/ @@ -235,9 +238,10 @@ void RGWPSDeleteTopicOp::execute() ups = std::make_unique(store, s->owner.get_id()); op_ret = ups->remove_topic(topic_name); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl; return; } + ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl; } // command: DELETE /topics/ @@ -324,9 +328,10 @@ void RGWPSCreateSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->subscribe(topic_name, dest); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl; } // command: PUT /subscriptions/?topic=[&push-endpoint=[&=]]... @@ -387,9 +392,10 @@ void RGWPSGetSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->get_conf(&result); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl; } // command: GET /subscriptions/ @@ -448,9 +454,10 @@ void RGWPSDeleteSubOp::execute() auto sub = ups->get_sub(sub_name); op_ret = sub->unsubscribe(topic_name); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl; } // command: DELETE /subscriptions/ @@ -498,9 +505,10 @@ void RGWPSAckSubEventOp::execute() auto sub = ups->get_sub_with_events(sub_name); op_ret = sub->remove_event(event_id); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl; } // command: POST /subscriptions/?ack&event-id= @@ -561,14 +569,15 @@ void RGWPSPullSubEventsOp::execute() sub = ups->get_sub_with_events(sub_name); if (!sub) { op_ret = -ENOENT; - ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl; return; } op_ret = sub->list_events(marker, max_entries); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to get subscription events, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl; } // command: GET /subscriptions/?events[&max-entries=][&marker=] @@ -746,9 +755,10 @@ void RGWPSCreateNotif_ObjStore_Ceph::execute() auto b = ups->get_bucket(bucket_info.bucket); op_ret = b->create_notification(topic_name, events); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl; } namespace { @@ -923,7 +933,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + unique_topic_name; dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/"; auto sub = ups->get_sub(sub_name); - op_ret = sub->subscribe(unique_topic_name, dest, c.id); + op_ret = sub->subscribe(unique_topic_name, dest, sub_name); if (op_ret < 0) { ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl; // rollback generated notification (ignore return value) @@ -932,6 +942,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { ups->remove_topic(unique_topic_name); return; } + ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl; } } @@ -1001,9 +1012,10 @@ void RGWPSDeleteNotif_ObjStore_Ceph::execute() { auto b = ups->get_bucket(bucket_info.bucket); op_ret = b->remove_notification(topic_name); if (op_ret < 0) { - ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl; + ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl; return; } + ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl; } // command (extension to S3): DELETE /bucket?notification[=] diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 1fb18aee8abac..dcec9c43aa683 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -129,6 +129,39 @@ int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& o return ret; } +int rgw_put_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, + RGWObjVersionTracker *objv_tracker, real_time set_mtime, map *pattrs) +{ + map no_attrs; + if (!pattrs) { + pattrs = &no_attrs; + } + + rgw_raw_obj obj(pool, oid); + + auto sysobj = obj_ctx.get_obj(obj); + int ret = sysobj.wop() + .set_objv_tracker(objv_tracker) + .set_exclusive(exclusive) + .set_mtime(set_mtime) + .set_attrs(*pattrs) + .write(data, null_yield); + + if (ret == -ENOENT) { + ret = rgwstore->create_pool(pool); + if (ret >= 0) { + ret = sysobj.wop() + .set_objv_tracker(objv_tracker) + .set_exclusive(exclusive) + .set_mtime(set_mtime) + .set_attrs(*pattrs) + .write(data, null_yield); + } + } + + return ret; +} + int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl, RGWObjVersionTracker *objv_tracker, real_time *pmtime, map *pattrs, rgw_cache_entry_info *cache_info, boost::optional refresh_version) diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index 927915e9ef0cd..30c9581d1bf29 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -24,6 +24,8 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, RGWObjVersionTracker *objv_tracker, real_time set_mtime, map *pattrs = NULL); +int rgw_put_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, + RGWObjVersionTracker *objv_tracker, real_time set_mtime, map *pattrs = NULL); int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl, RGWObjVersionTracker *objv_tracker, real_time *pmtime, map *pattrs = NULL, rgw_cache_entry_info *cache_info = NULL, diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 8846f5a800880..3d7c16e5b2c53 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -172,9 +172,12 @@ def test_ps_s3_notification_low_level(): # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX generated_topic_name = notification_name+'_'+topic_name - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) - response, status = s3_notification_conf.set_config() + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zones[0].zone) # get auto-generated topic @@ -198,7 +201,7 @@ def test_ps_s3_notification_low_level(): assert_equal(status/100, 2) assert_equal(parsed_result['topic'], generated_topic_name) # delete s3 notification - _, status = s3_notification_conf.del_config(all_notifications=False) + _, status = s3_notification_conf.del_config(notification=notification_name) assert_equal(status/100, 2) # delete topic _, status = topic_conf.del_config() @@ -241,9 +244,12 @@ def test_ps_s3_notification_records(): topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) - response, status = s3_notification_conf.set_config() + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zones[0].zone) # get auto-generated subscription @@ -295,15 +301,20 @@ def test_ps_s3_notification(): topic_arn = parsed_result['arn'] # create one s3 notification notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' - s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name1, topic_arn, ['s3:ObjectCreated:*']) + topic_conf_list = [{'Id': notification_name1, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) - # create another s3 notification + # create another s3 notification with the same topic notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' - - s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name2, topic_arn, ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']) + topic_conf_list = [{'Id': notification_name2, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] + }] + s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zones[0].zone) @@ -316,19 +327,19 @@ def test_ps_s3_notification(): assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) # get specific notification on a bucket - response, status = s3_notification_conf1.get_config(all_notifications=False) + response, status = s3_notification_conf1.get_config(notification=notification_name1) assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1) - response, status = s3_notification_conf2.get_config(all_notifications=False) + response, status = s3_notification_conf2.get_config(notification=notification_name2) assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2) # delete specific notifications - _, status = s3_notification_conf1.del_config(all_notifications=False) + _, status = s3_notification_conf1.del_config(notification=notification_name1) assert_equal(status/100, 2) - _, status = s3_notification_conf2.del_config(all_notifications=False) + _, status = s3_notification_conf2.del_config(notification=notification_name2) assert_equal(status/100, 2) # cleanup @@ -361,7 +372,8 @@ def test_ps_topic(): _, status = topic_conf.del_config() assert_equal(status/100, 2) # verift topic is deleted - result, _ = topic_conf.get_config() + result, status = topic_conf.get_config() + assert_equal(status, 404) parsed_result = json.loads(result) assert_equal(parsed_result['Code'], 'NoSuchKey') @@ -976,8 +988,11 @@ def test_ps_s3_push_http(): zone_meta_checkpoint(ps_zones[0].zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -1054,7 +1069,7 @@ def test_ps_push_amqp(): def test_ps_s3_push_amqp(): - """ test pushing to amqp endpoint n s3 record format""" + """ test pushing to amqp endpoint s3 record format""" return SkipTest("PubSub push tests are only manual") zones, ps_zones = init_env() bucket_name = gen_bucket_name() @@ -1066,15 +1081,19 @@ def test_ps_s3_push_amqp(): endpoint_args='amqp-exchange=ex1&amqp-ack-level=none') result, status = topic_conf.set_config() assert_equal(status/100, 2) - topic_arn = 'arn:aws:sns:::' + topic_name + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -1118,8 +1137,11 @@ def test_ps_delete_bucket(): topic_arn = parsed_result['arn'] # create one s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1156,7 +1178,7 @@ def test_ps_delete_bucket(): verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) # s3 notification is deleted with bucket - _, status = s3_notification_conf.get_config(all_notifications=False) + _, status = s3_notification_conf.get_config(notification=notification_name) assert_equal(status, 404) # non-s3 notification is deleted with bucket _, status = notification_conf.get_config() @@ -1179,8 +1201,11 @@ def test_ps_missing_topic(): # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_arn = 'arn:aws:sns:::' + topic_name - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, - notification_name, topic_arn, ['s3:ObjectCreated:*']) + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) try: s3_notification_conf.set_config() except: @@ -1190,3 +1215,246 @@ def test_ps_missing_topic(): # cleanup zones[0].delete_bucket(bucket_name) + + +def test_ps_s3_topic_update(): + """ test updating topic associated with a notification""" + return SkipTest("PubSub push tests are only manual") + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name = bucket_name+TOPIC_SUFFIX + + # create topic + dest_endpoint1 = 'amqp://localhost' + dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none' + dest_endpoint2 = 'http://localhost:9001' + topic_conf = PSTopic(ps_zones[0].conn, topic_name, + endpoint=dest_endpoint1, + endpoint_args=dest_args1) + result, status = topic_conf.set_config() + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] + assert_equal(status/100, 2) + # get topic + result, _ = topic_conf.get_config() + # verify topic content + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint1) + + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + # create objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # TODO: check update to amqp + + # update the same topic + topic_conf = PSTopic(ps_zones[0].conn, topic_name, + endpoint=dest_endpoint2) + _, status = topic_conf.set_config() + assert_equal(status/100, 2) + # get topic + result, _ = topic_conf.get_config() + # verify topic content + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint2) + + # create more objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i+100)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # TODO: check it is still updating amqp + + # update notification to update the endpoint from the topic + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + # create even more objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i+200)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # TODO: check that updates switched to http + + # cleanup + # delete objects from the bucket + for key in bucket.list(): + key.delete() + s3_notification_conf.del_config() + topic_conf.del_config() + zones[0].delete_bucket(bucket_name) + + +def test_ps_s3_notification_update(): + """ test updating the topic of a notification""" + return SkipTest("PubSub push tests are only manual") + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX + + # create first topic + dest_endpoint1 = 'amqp://localhost' + dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none' + topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + endpoint=dest_endpoint1, + endpoint_args=dest_args1) + result, status = topic_conf1.set_config() + parsed_result = json.loads(result) + topic_arn1 = parsed_result['arn'] + assert_equal(status/100, 2) + + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn1, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + # create objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + result, _ = s3_notification_conf.get_config() + + # TODO: check updates to amqp + + # create another topic + topic_name2 = bucket_name+'http'+TOPIC_SUFFIX + dest_endpoint2 = 'http://localhost:9001' + topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + endpoint=dest_endpoint2) + result, status = topic_conf2.set_config() + parsed_result = json.loads(result) + topic_arn2 = parsed_result['arn'] + assert_equal(status/100, 2) + + # update notification to the new topic + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn2, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + # create more objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i+200)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # TODO: check uodate to http + result, _ = s3_notification_conf.get_config() + + # cleanup + # delete objects from the bucket + for key in bucket.list(): + key.delete() + s3_notification_conf.del_config() + topic_conf1.del_config() + topic_conf2.del_config() + zones[0].delete_bucket(bucket_name) + + +def test_ps_s3_multiple_topics_notification(): + """ test notification creation with multiple topics""" + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX + topic_name2 = bucket_name+'http'+TOPIC_SUFFIX + + # create topics + dest_endpoint1 = 'amqp://localhost' + dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none' + dest_endpoint2 = 'http://localhost:9001' + topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + endpoint=dest_endpoint1, + endpoint_args=dest_args1) + result, status = topic_conf1.set_config() + parsed_result = json.loads(result) + topic_arn1 = parsed_result['arn'] + assert_equal(status/100, 2) + topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + endpoint=dest_endpoint2) + result, status = topic_conf2.set_config() + parsed_result = json.loads(result) + topic_arn2 = parsed_result['arn'] + assert_equal(status/100, 2) + + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [ + { + 'Id': notification_name + '_1', + 'TopicArn': topic_arn1, + 'Events': ['s3:ObjectCreated:*'] + }, + { + 'Id': notification_name + '_2', + 'TopicArn': topic_arn2, + 'Events': ['s3:ObjectCreated:*'] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + result, _ = s3_notification_conf.get_config() + print('first try') + print(result) + # FIXME: this is currently failing + #assert_equal(len(result['TopicConfigurations']), 2) + + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + result, _ = s3_notification_conf.get_config() + print('second try') + print(result) + assert_equal(len(result['TopicConfigurations']), 2) + + # cleanup + s3_notification_conf.del_config() + topic_conf1.del_config() + topic_conf2.del_config() + zones[0].delete_bucket(bucket_name) diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index ca97d4c57b6c6..0073adc4e561c 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -154,14 +154,12 @@ class PSNotificationS3: GET /?notification[=] DELETE /?notification[=] """ - def __init__(self, conn, bucket_name, notification, topic_arn, events=None): + def __init__(self, conn, bucket_name, topic_conf_list): self.conn = conn assert bucket_name.strip() self.bucket_name = bucket_name self.resource = '/'+bucket_name - self.notification = notification - self.topic_arn = topic_arn - self.events = events + self.topic_conf_list = topic_conf_list self.client = boto3.client('s3', endpoint_url='http://'+conn.host+':'+str(conn.port), aws_access_key_id=conn.aws_access_key_id, @@ -173,14 +171,14 @@ class PSNotificationS3: return make_request(self.conn, method, self.resource, parameters=parameters, sign_parameters=True) - def get_config(self, all_notifications=True): + def get_config(self, notification=None): """get notification info""" parameters = None - if all_notifications: + if notification is None: response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name) status = response['ResponseMetadata']['HTTPStatusCode'] return response, status - parameters = {'notification': self.notification} + parameters = {'notification': notification} response, status = self.send_request('GET', parameters=parameters) dict_response = xmltodict.parse(response) return dict_response, status @@ -189,24 +187,14 @@ class PSNotificationS3: """set notification""" response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name, NotificationConfiguration={ - 'TopicConfigurations': [ - { - 'Id': self.notification, - 'TopicArn': self.topic_arn, - 'Events': self.events, - } - ] + 'TopicConfigurations': self.topic_conf_list }) status = response['ResponseMetadata']['HTTPStatusCode'] return response, status - def del_config(self, all_notifications=True): + def del_config(self, notification=None): """delete notification""" - parameters = None - if all_notifications: - parameters = {'notification': None} - else: - parameters = {'notification': self.notification} + parameters = {'notification': notification} return self.send_request('DELETE', parameters) diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index ebfc5b505a1e5..e3863c5a9a6e0 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -70,6 +70,8 @@ class Cluster(multisite.Cluster): env = os.environ.copy() env['CEPH_NUM_MDS'] = '0' cmd += ['-n'] + # cmd += ['-o'] + # cmd += ['rgw_cache_enabled=false'] bash(cmd, env=env) self.needs_reset = False