]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: handle subscription conf errors better 27530/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 11 Apr 2019 17:16:52 +0000 (20:16 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Thu, 11 Apr 2019 17:16:52 +0000 (20:16 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_perf_counters.cc
src/rgw/rgw_perf_counters.h
src/rgw/rgw_sync_module_pubsub.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index f5c3f8746ce987955bf12eba6232a209a4b10f23..d12daff2e14c3c4c56260de87708191c8d0e08a2 100644 (file)
@@ -42,6 +42,7 @@ int rgw_perf_start(CephContext *cct)
   plb.add_u64_counter(l_rgw_pubsub_push_ok, "pubsub_push_ok", "Pubsub events pushed to an endpoint");
   plb.add_u64_counter(l_rgw_pubsub_push_failed, "pubsub_push_failed", "Pubsub events failed to be pushed to an endpoint");
   plb.add_u64(l_rgw_pubsub_push_pending, "pubsub_push_pending", "Pubsub events pending reply from endpoint");
+  plb.add_u64_counter(l_rgw_pubsub_missing_conf, "pubsub_missing_conf", "Pubsub events could not be handled because of missing configuration");
   
   perfcounter = plb.create_perf_counters();
   cct->get_perfcounters_collection()->add(perfcounter);
index f119508b9896c8b5c13144c7e89cc98dd43b5544..3220e5068bd80b99b9145603e01bdff2c9e5b07e 100644 (file)
@@ -41,6 +41,7 @@ enum {
   l_rgw_pubsub_push_ok,
   l_rgw_pubsub_push_failed,
   l_rgw_pubsub_push_pending,
+  l_rgw_pubsub_missing_conf,
 
   l_rgw_last,
 };
index 098892c391f839779649c9412a10fd3af7f6fea3..25c1d5227067804a4458b9f60b168bf7abc4b747 100644 (file)
@@ -674,7 +674,7 @@ class PSSubscription {
                                                   sync_env->store,
                                                   lc_config));
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
+          ldout(sync_env->cct, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
           return set_cr_error(retcode);
         }
 
@@ -801,12 +801,10 @@ class PSSubscription {
                                             sync_env->store,
                                             put_obj));
         if (retcode < 0) {
-          ldout(sync_env->cct, 1) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
-          if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+          ldout(sync_env->cct, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
           return set_cr_error(retcode);
         } else {
           ldout(sync_env->cct, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
-          if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
         }
 
         return set_cr_done();
@@ -836,15 +834,13 @@ class PSSubscription {
         yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
       
         if (retcode < 0) {
-          ldout(sync_env->cct, 10) << "ERROR: failed to push event: " << event->id <<
+          ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
             " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
-          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
           return set_cr_error(retcode);
         }
         
-        ldout(sync_env->cct, 10) << "event: " << event->id <<
+        ldout(sync_env->cct, 20) << "event: " << event->id <<
           " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
-        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
         return set_cr_done();
       }
       return 0;
@@ -938,7 +934,7 @@ class PSManager
       reenter(this) {
         if (owner.empty()) {
           if (!conf->find_sub(sub_name, &sub_conf)) {
-            ldout(sync_env->cct, 1) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+            ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
             mgr->remove_get_sub(owner, sub_name);
             return set_cr_error(-ENOENT);
           }
@@ -965,7 +961,7 @@ class PSManager
 
         yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
-          ldout(sync_env->cct, 1) << "ERROR: failed to init subscription" << dendl;
+          ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
           mgr->remove_get_sub(owner, sub_name);
           return set_cr_error(retcode);
         }
@@ -1223,20 +1219,24 @@ public:
 
       if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
 
+      // loop over all topics related to the bucket/object
       for (titer = topics->begin(); titer != topics->end(); ++titer) {
         ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" << 
           (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
-
+        // loop over all subscriptions of the topic
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl;
           has_subscriptions = true;
           sub_conf_found = false;
+          // try to read subscription configuration from global/user cond
+          // configuration is considered missing only if does not exist in either
           for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
-            /*
-             * once for the global subscriptions, once for the user specific subscriptions
-             */
             yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
             if (retcode < 0) {
+              if (sub_conf_found) {
+                // not a real issue, sub conf already found
+                retcode = 0;
+              }
               last_sub_conf_error = retcode;
               continue;
             }
@@ -1246,16 +1246,20 @@ public:
               ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
               yield call(PSSubscription::store_event_cr(sync_env, sub, event));
               if (retcode < 0) {
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
                 ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
                 event_handled = true;
               }
               if (sub->sub_conf->push_endpoint) {
-                  ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+                ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
                 yield call(PSSubscription::push_event_cr(sync_env, sub, event));
                 if (retcode < 0) {
+                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
                   ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
                 } else {
+                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                   event_handled = true;
                 }
               } 
@@ -1265,16 +1269,20 @@ public:
               record->configurationId = sub->sub_conf->s3_id;
               yield call(PSSubscription::store_event_cr(sync_env, sub, record));
               if (retcode < 0) {
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
                 ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
                 event_handled = true;
               }
               if (sub->sub_conf->push_endpoint) {
                   ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
                 yield call(PSSubscription::push_event_cr(sync_env, sub, record));
                 if (retcode < 0) {
+                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
                   ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
                 } else {
+                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                   event_handled = true;
                 }
               }
@@ -1282,8 +1290,14 @@ public:
           }
           if (!sub_conf_found) {
             // could not find conf for subscription at user or global levels
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
             ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter 
               << " ret=" << last_sub_conf_error << dendl;
+              if (retcode == -ENOENT) {
+                // missing subscription info should be reflected back as invalid argument
+                // and not as missing object
+                retcode =  -EINVAL;
+              }
           }
         }
       }
@@ -1404,7 +1418,7 @@ public:
                                              EVENT_NAME_OBJECT_CREATE,
                                              &topics));
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+        ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
       }
       if (topics->empty()) {
@@ -1446,7 +1460,7 @@ public:
                                                              mtime(_mtime), event_name(get_event_name(_event_type)) {}
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
+      ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
       yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
       if (retcode < 0) {
index 24ce68a29be040f8f5ddaaf0ee6bfd9bc13767d4..8f95523d3a44e510ce69a3d59d451be607a44f0a 100644 (file)
@@ -9,7 +9,7 @@ from rgw_multi.tests import get_realm, \
     zone_data_checkpoint, \
     check_bucket_eq, \
     gen_bucket_name
-from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3
+from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
 
@@ -126,6 +126,26 @@ NOTIFICATION_SUFFIX = "_notif"
 # pubsub tests
 ##############
 
+def test_ps_info():
+    """ log information for manual testing """
+    return SkipTest("only used in manual testing")
+    zones, ps_zones = init_env()
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    # create bucket on the first of the rados zones
+    bucket = zones[0].create_bucket(bucket_name)
+    # create objects in the bucket
+    number_of_objects = 10
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        key.set_contents_from_string('bar')
+    print('Zonegroup: ' + zonegroup.name)
+    print('Master Zone')
+    print_connection_info(zones[0].conn)
+    print('PubSub Zone')
+    print_connection_info(ps_zones[0].conn)
+    print('Bucket: ' + bucket_name)
 
 def test_ps_s3_notification_low_level():
     """ test low level implementation of s3 notifications """
@@ -855,6 +875,7 @@ def test_ps_versioned_deletion():
 
 def test_ps_push_http():
     """ test pushing to http endpoint """
+    return SkipTest("PubSub push tests are only manual")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
@@ -901,6 +922,7 @@ def test_ps_push_http():
 
 def test_ps_s3_push_http():
     """ test pushing to http endpoint n s3 record format"""
+    return SkipTest("PubSub push tests are only manual")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
@@ -946,6 +968,7 @@ def test_ps_s3_push_http():
 
 def test_ps_push_amqp():
     """ test pushing to amqp endpoint """
+    return SkipTest("PubSub push tests are only manual")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
@@ -995,6 +1018,7 @@ def test_ps_push_amqp():
 
 def test_ps_s3_push_amqp():
     """ test pushing to amqp endpoint n s3 record format"""
+    return SkipTest("PubSub push tests are only manual")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
@@ -1104,3 +1128,29 @@ def test_ps_delete_bucket():
     # cleanup
     sub_conf.del_config()
     topic_conf.del_config()
+
+
+def test_ps_missing_topic():
+    """ test creating a subscription when no topic info exists"""
+    zones, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create bucket on the first of the rados zones
+    zones[0].create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_arn = 'arn:aws:sns:::' + topic_name
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
+                                            notification_name, topic_arn, ['s3:ObjectCreated:*'])
+    try:
+        s3_notification_conf.set_config()
+    except:
+        print('missing topic is expected')
+    else:
+        assert 'missing topic is expected'
+
+    # cleanup
+    zones[0].delete_bucket(bucket_name)
index 0c7fd74dcb567f1416f32b4b3aea6691e381c9e7..ca97d4c57b6c6d7f07e811ce730ef8aaa2df75aa 100644 (file)
@@ -34,6 +34,13 @@ class PSZone(Zone):  # pylint: disable=too-many-ancestors
 NO_HTTP_BODY = ''
 
 
+def print_connection_info(conn):
+    """print connection details"""
+    print('Endpoint: ' + conn.host + ':' + str(conn.port))
+    print('AWS Access Key:: ' + conn.aws_access_key_id)
+    print('AWS Secret Key:: ' + conn.aws_secret_access_key)
+
+
 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
     """generic request sending to pubsub radogw
     should cover: topics, notificatios and subscriptions