From: Yuval Lifshitz Date: Thu, 11 Apr 2019 17:16:52 +0000 (+0300) Subject: rgw/pubsub: handle subscription conf errors better X-Git-Tag: v15.1.0~2929^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F27530%2Fhead;p=ceph.git rgw/pubsub: handle subscription conf errors better Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_perf_counters.cc b/src/rgw/rgw_perf_counters.cc index f5c3f8746ce9..d12daff2e14c 100644 --- a/src/rgw/rgw_perf_counters.cc +++ b/src/rgw/rgw_perf_counters.cc @@ -42,6 +42,7 @@ int rgw_perf_start(CephContext *cct) plb.add_u64_counter(l_rgw_pubsub_push_ok, "pubsub_push_ok", "Pubsub events pushed to an endpoint"); plb.add_u64_counter(l_rgw_pubsub_push_failed, "pubsub_push_failed", "Pubsub events failed to be pushed to an endpoint"); plb.add_u64(l_rgw_pubsub_push_pending, "pubsub_push_pending", "Pubsub events pending reply from endpoint"); + plb.add_u64_counter(l_rgw_pubsub_missing_conf, "pubsub_missing_conf", "Pubsub events could not be handled because of missing configuration"); perfcounter = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perfcounter); diff --git a/src/rgw/rgw_perf_counters.h b/src/rgw/rgw_perf_counters.h index f119508b9896..3220e5068bd8 100644 --- a/src/rgw/rgw_perf_counters.h +++ b/src/rgw/rgw_perf_counters.h @@ -41,6 +41,7 @@ enum { l_rgw_pubsub_push_ok, l_rgw_pubsub_push_failed, l_rgw_pubsub_push_pending, + l_rgw_pubsub_missing_conf, l_rgw_last, }; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 098892c391f8..25c1d5227067 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -674,7 +674,7 @@ class PSSubscription { sync_env->store, lc_config)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; return set_cr_error(retcode); } @@ -801,12 +801,10 @@ class PSSubscription { sync_env->store, put_obj)); if (retcode < 0) { - ldout(sync_env->cct, 1) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); + ldout(sync_env->cct, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; return set_cr_error(retcode); } else { ldout(sync_env->cct, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl; - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); } return set_cr_done(); @@ -836,15 +834,13 @@ class PSSubscription { yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env)); if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to push event: " << event->id << + ldout(sync_env->cct, 10) << "failed to push event: " << event->id << " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl; - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return set_cr_error(retcode); } - ldout(sync_env->cct, 10) << "event: " << event->id << + ldout(sync_env->cct, 20) << "event: " << event->id << " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl; - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); return set_cr_done(); } return 0; @@ -938,7 +934,7 @@ class PSManager reenter(this) { if (owner.empty()) { if (!conf->find_sub(sub_name, &sub_conf)) { - ldout(sync_env->cct, 1) << "ERROR: could not find subscription config: name=" << sub_name << dendl; + ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(-ENOENT); } @@ -965,7 +961,7 @@ class PSManager yield (*ref)->call_init_cr(this); if (retcode < 0) { - ldout(sync_env->cct, 1) << "ERROR: failed to init subscription" << dendl; + ldout(sync_env->cct, 10) << "failed to init subscription" << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } @@ -1223,20 +1219,24 @@ public: if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); + // loop over all topics related to the bucket/object for (titer = topics->begin(); titer != topics->end(); ++titer) { ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; - + // loop over all subscriptions of the topic for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; sub_conf_found = false; + // try to read subscription configuration from global/user cond + // configuration is considered missing only if does not exist in either for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { - /* - * once for the global subscriptions, once for the user specific subscriptions - */ yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub); if (retcode < 0) { + if (sub_conf_found) { + // not a real issue, sub conf already found + retcode = 0; + } last_sub_conf_error = retcode; continue; } @@ -1246,16 +1246,20 @@ public: ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; yield call(PSSubscription::store_event_cr(sync_env, sub, event)); if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { - ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; yield call(PSSubscription::push_event_cr(sync_env, sub, event)); if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } } @@ -1265,16 +1269,20 @@ public: record->configurationId = sub->sub_conf->s3_id; yield call(PSSubscription::store_event_cr(sync_env, sub, record)); if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; yield call(PSSubscription::push_event_cr(sync_env, sub, record)); if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } } @@ -1282,8 +1290,14 @@ public: } if (!sub_conf_found) { // could not find conf for subscription at user or global levels + if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << last_sub_conf_error << dendl; + if (retcode == -ENOENT) { + // missing subscription info should be reflected back as invalid argument + // and not as missing object + retcode = -EINVAL; + } } } } @@ -1404,7 +1418,7 @@ public: EVENT_NAME_OBJECT_CREATE, &topics)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { @@ -1446,7 +1460,7 @@ public: mtime(_mtime), event_name(get_event_name(_event_type)) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics)); if (retcode < 0) { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 24ce68a29be0..8f95523d3a44 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -9,7 +9,7 @@ from rgw_multi.tests import get_realm, \ zone_data_checkpoint, \ check_bucket_eq, \ gen_bucket_name -from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3 +from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info from nose import SkipTest from nose.tools import assert_not_equal, assert_equal @@ -126,6 +126,26 @@ NOTIFICATION_SUFFIX = "_notif" # pubsub tests ############## +def test_ps_info(): + """ log information for manual testing """ + return SkipTest("only used in manual testing") + zones, ps_zones = init_env() + realm = get_realm() + zonegroup = realm.master_zonegroup() + bucket_name = gen_bucket_name() + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # create objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + print('Zonegroup: ' + zonegroup.name) + print('Master Zone') + print_connection_info(zones[0].conn) + print('PubSub Zone') + print_connection_info(ps_zones[0].conn) + print('Bucket: ' + bucket_name) def test_ps_s3_notification_low_level(): """ test low level implementation of s3 notifications """ @@ -855,6 +875,7 @@ def test_ps_versioned_deletion(): def test_ps_push_http(): """ test pushing to http endpoint """ + return SkipTest("PubSub push tests are only manual") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -901,6 +922,7 @@ def test_ps_push_http(): def test_ps_s3_push_http(): """ test pushing to http endpoint n s3 record format""" + return SkipTest("PubSub push tests are only manual") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -946,6 +968,7 @@ def test_ps_s3_push_http(): def test_ps_push_amqp(): """ test pushing to amqp endpoint """ + return SkipTest("PubSub push tests are only manual") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -995,6 +1018,7 @@ def test_ps_push_amqp(): def test_ps_s3_push_amqp(): """ test pushing to amqp endpoint n s3 record format""" + return SkipTest("PubSub push tests are only manual") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -1104,3 +1128,29 @@ def test_ps_delete_bucket(): # cleanup sub_conf.del_config() topic_conf.del_config() + + +def test_ps_missing_topic(): + """ test creating a subscription when no topic info exists""" + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name = bucket_name+TOPIC_SUFFIX + + # create bucket on the first of the rados zones + zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_arn = 'arn:aws:sns:::' + topic_name + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, + notification_name, topic_arn, ['s3:ObjectCreated:*']) + try: + s3_notification_conf.set_config() + except: + print('missing topic is expected') + else: + assert 'missing topic is expected' + + # cleanup + zones[0].delete_bucket(bucket_name) diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index 0c7fd74dcb56..ca97d4c57b6c 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -34,6 +34,13 @@ class PSZone(Zone): # pylint: disable=too-many-ancestors NO_HTTP_BODY = '' +def print_connection_info(conn): + """print connection details""" + print('Endpoint: ' + conn.host + ':' + str(conn.port)) + print('AWS Access Key:: ' + conn.aws_access_key_id) + print('AWS Secret Key:: ' + conn.aws_secret_access_key) + + def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None): """generic request sending to pubsub radogw should cover: topics, notificatios and subscriptions