sync_env->store,
lc_config));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
return set_cr_error(retcode);
}
sync_env->store,
put_obj));
if (retcode < 0) {
- ldout(sync_env->cct, 1) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+ ldout(sync_env->cct, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
return set_cr_error(retcode);
} else {
ldout(sync_env->cct, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
}
return set_cr_done();
yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to push event: " << event->id <<
+ ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
" to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return set_cr_error(retcode);
}
- ldout(sync_env->cct, 10) << "event: " << event->id <<
+ ldout(sync_env->cct, 20) << "event: " << event->id <<
" pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
return set_cr_done();
}
return 0;
reenter(this) {
if (owner.empty()) {
if (!conf->find_sub(sub_name, &sub_conf)) {
- ldout(sync_env->cct, 1) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+ ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
mgr->remove_get_sub(owner, sub_name);
return set_cr_error(-ENOENT);
}
yield (*ref)->call_init_cr(this);
if (retcode < 0) {
- ldout(sync_env->cct, 1) << "ERROR: failed to init subscription" << dendl;
+ ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
mgr->remove_get_sub(owner, sub_name);
return set_cr_error(retcode);
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
+ // loop over all topics related to the bucket/object
for (titer = topics->begin(); titer != topics->end(); ++titer) {
ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" <<
(*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
-
+ // loop over all subscriptions of the topic
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl;
has_subscriptions = true;
sub_conf_found = false;
+ // try to read subscription configuration from global/user cond
+ // configuration is considered missing only if does not exist in either
for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
- /*
- * once for the global subscriptions, once for the user specific subscriptions
- */
yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
if (retcode < 0) {
+ if (sub_conf_found) {
+ // not a real issue, sub conf already found
+ retcode = 0;
+ }
last_sub_conf_error = retcode;
continue;
}
ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
yield call(PSSubscription::store_event_cr(sync_env, sub, event));
if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
event_handled = true;
}
if (sub->sub_conf->push_endpoint) {
- ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
yield call(PSSubscription::push_event_cr(sync_env, sub, event));
if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
event_handled = true;
}
}
record->configurationId = sub->sub_conf->s3_id;
yield call(PSSubscription::store_event_cr(sync_env, sub, record));
if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
event_handled = true;
}
if (sub->sub_conf->push_endpoint) {
ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
yield call(PSSubscription::push_event_cr(sync_env, sub, record));
if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
event_handled = true;
}
}
}
if (!sub_conf_found) {
// could not find conf for subscription at user or global levels
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
<< " ret=" << last_sub_conf_error << dendl;
+ if (retcode == -ENOENT) {
+ // missing subscription info should be reflected back as invalid argument
+ // and not as missing object
+ retcode = -EINVAL;
+ }
}
}
}
EVENT_NAME_OBJECT_CREATE,
&topics));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
}
if (topics->empty()) {
mtime(_mtime), event_name(get_event_name(_event_type)) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
+ ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
if (retcode < 0) {
zone_data_checkpoint, \
check_bucket_eq, \
gen_bucket_name
-from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3
+from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
# pubsub tests
##############
+def test_ps_info():
+ """ log information for manual testing """
+ return SkipTest("only used in manual testing")
+ zones, ps_zones = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ # create bucket on the first of the rados zones
+ bucket = zones[0].create_bucket(bucket_name)
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ print('Zonegroup: ' + zonegroup.name)
+ print('Master Zone')
+ print_connection_info(zones[0].conn)
+ print('PubSub Zone')
+ print_connection_info(ps_zones[0].conn)
+ print('Bucket: ' + bucket_name)
def test_ps_s3_notification_low_level():
""" test low level implementation of s3 notifications """
def test_ps_push_http():
""" test pushing to http endpoint """
+ return SkipTest("PubSub push tests are only manual")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
def test_ps_s3_push_http():
""" test pushing to http endpoint n s3 record format"""
+ return SkipTest("PubSub push tests are only manual")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
def test_ps_push_amqp():
""" test pushing to amqp endpoint """
+ return SkipTest("PubSub push tests are only manual")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
def test_ps_s3_push_amqp():
""" test pushing to amqp endpoint n s3 record format"""
+ return SkipTest("PubSub push tests are only manual")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# cleanup
sub_conf.del_config()
topic_conf.del_config()
+
+
+def test_ps_missing_topic():
+ """ test creating a subscription when no topic info exists"""
+ zones, ps_zones = init_env()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+
+ # create bucket on the first of the rados zones
+ zones[0].create_bucket(bucket_name)
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_arn = 'arn:aws:sns:::' + topic_name
+ s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
+ notification_name, topic_arn, ['s3:ObjectCreated:*'])
+ try:
+ s3_notification_conf.set_config()
+ except:
+ print('missing topic is expected')
+ else:
+ assert 'missing topic is expected'
+
+ # cleanup
+ zones[0].delete_bucket(bucket_name)