[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
[&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
+ [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
Request parameters:
- push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
+
- HTTP endpoint
- URI: ``http[s]://<fqdn>[:<port]``
<EndpointTopic></EndpointTopic>
</EndPoint>
<TopicArn></TopicArn>
+ <OpaqueData></OpaqueData>
</Topic>
</GetTopicResult>
<ResponseMetadata>
<EndpointTopic></EndpointTopic>
</EndPoint>
<TopicArn></TopicArn>
+ <OpaqueData></OpaqueData>
</member>
</Topics>
</ListTopicsResult>
}
},
"eventId":"",
+ "opaqueData":"",
}
]}
- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API)
- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
.. _PubSub Module : ../pubsub-module
.. _S3 Notification Compatibility: ../s3-notification-compatibility
::
- PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
+ PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
Request parameters:
- push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
The endpoint URI may include parameters depending with the type of endpoint:
"push_endpoint_topic":""
},
"arn":""
+ "opaqueData":""
},
"subs":[]
}
}
},
"eventId":"",
+ "opaqueData":"",
}
]}
- s3.object.metadata: not supported (an extension to the S3 notification API)
- s3.object.tags: not supported (an extension to the S3 notification API)
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
In case that the subscription was not created via a non S3-compatible notification,
the events will have the following event format (JSON):
- ``DeleteTopic``
- ``ListTopics``
-We also extend it by:
+We also have the following extensions to topic configuration:
+
+ - In ``GetTopic`` we allow fetching a specific topic, instead of all user topics
+ - In ``CreateTopic``
+
+ - we allow setting endpoint attributes
+ - we allow setting opaque data thta will be sent to the endpoint in the notification
- - ``GetTopic`` - allowing for fetching a specific topic, instead of all user topics
- - In ``CreateTopic`` we allow setting endpoint attributes
.. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html
.. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
record.userIdentity = s->user->user_id.id; // user that triggered the change
record.x_amz_request_id = s->req_id; // request ID of the original change
record.x_amz_id_2 = s->host_id; // RGW on which the change was made
- // configurationId is filled from subscription configuration
+ // configurationId is filled from notification configuration
record.bucket_name = s->bucket_name;
record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
record.bucket_arn = to_string(rgw::ARN(s->bucket));
record.x_meta_map = s->info.x_meta_map;
// pass tags
record.tags = s->tagset.get_tags();
+ // opaque data will be filled from topic configuration
}
bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
}
event_should_be_handled = true;
record.configurationId = topic_filter.s3_id;
+ record.opaque_data = topic_cfg.opaque_data;
ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id <<
"' on topic: '" << topic_cfg.dest.arn_topic <<
"' and bucket: '" << s->bucket.name <<
}
}
encode_json("eventId", id, f);
+ encode_json("opaqueData", opaque_data, f);
}
void rgw_pubsub_event::dump(Formatter *f) const
encode_json("name", name, f);
encode_json("dest", dest, f);
encode_json("arn", arn, f);
+ encode_json("opaqueData", opaque_data, f);
}
void rgw_pubsub_topic::dump_xml(Formatter *f) const
encode_xml("Name", name, f);
encode_xml("EndPoint", dest, f);
encode_xml("TopicArn", arn, f);
+ encode_xml("OpaqueData", opaque_data, f);
}
void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
}
int RGWUserPubSub::create_topic(const string& name) {
- return create_topic(name, rgw_pubsub_sub_dest(), "");
+ return create_topic(name, rgw_pubsub_sub_dest(), "", "");
}
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) {
+int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
RGWObjVersionTracker objv_tracker;
rgw_pubsub_user_topics topics;
new_topic.topic.name = name;
new_topic.topic.dest = dest;
new_topic.topic.arn = arn;
+ new_topic.topic.opaque_data = opaque_data;
ret = write_user_topics(topics, &objv_tracker);
if (ret < 0) {
using OptionalFilter = std::optional<rgw_s3_filter>;
-class rgw_pubsub_topic_filter;
+struct rgw_pubsub_topic_filter;
/* S3 notification configuration
* based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
KeyValueList x_meta_map;
// tags
KeyValueList tags;
+ // opaque data received from the topic
+ // could be used to identify the gateway
+ std::string opaque_data;
void encode(bufferlist& bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(4, 1, bl);
encode(eventVersion, bl);
encode(eventSource, bl);
encode(awsRegion, bl);
encode(bucket_id, bl);
encode(x_meta_map, bl);
encode(tags, bl);
+ encode(opaque_data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(3, bl);
+ DECODE_START(4, bl);
decode(eventVersion, bl);
decode(eventSource, bl);
decode(awsRegion, bl);
decode(object_sequencer, bl);
decode(id, bl);
if (struct_v >= 2) {
- decode(bucket_id, bl);
- decode(x_meta_map, bl);
+ decode(bucket_id, bl);
+ decode(x_meta_map, bl);
}
if (struct_v >= 3) {
- decode(tags, bl);
+ decode(tags, bl);
+ }
+ if (struct_v >= 4) {
+ decode(opaque_data, bl);
}
DECODE_FINISH(bl);
}
std::string name;
rgw_pubsub_sub_dest dest;
std::string arn;
+ std::string opaque_data;
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(user, bl);
encode(name, bl);
encode(dest, bl);
encode(arn, bl);
+ encode(opaque_data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(user, bl);
decode(name, bl);
if (struct_v >= 2) {
decode(dest, bl);
decode(arn, bl);
}
+ if (struct_v >= 3) {
+ decode(opaque_data, bl);
+ }
DECODE_FINISH(bl);
}
// create a topic with push destination information and ARN
// if the topic already exists the destination and ARN values may be updated (considered succsess)
// return 0 on success, error code otherwise
- int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
+ int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data);
// remove a topic according to its name
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
return -EINVAL;
}
+ opaque_data = s->info.args.get("OpaqueData");
+
dest.push_endpoint = s->info.args.get("push-endpoint");
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
// generate the internal topic. destination is stored here for the "push-only" case
// when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn);
+ op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
}
ups.emplace(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name, dest, topic_arn);
+ op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
std::string topic_name;
rgw_pubsub_sub_dest dest;
std::string topic_arn;
+ std::string opaque_data;
virtual int get_params() = 0;
struct PSTopicConfig {
std::string name;
std::set<std::string> subs;
+ std::string opaque_data;
void dump(Formatter *f) const {
encode_json("name", name, f);
encode_json("subs", subs, f);
+ encode_json("opaque", opaque_data, f);
}
};
using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
struct PSConfig {
- string id{"pubsub"};
+ const std::string id{"pubsub"};
rgw_user user;
- string data_bucket_prefix;
- string data_oid_prefix;
+ std::string data_bucket_prefix;
+ std::string data_oid_prefix;
int events_retention_days{0};
}
{
Formatter::ObjectSection section(*f, "notifications");
- string last;
+ std::string last;
for (auto& notif : notifications) {
const string& n = notif.first;
if (n != last) {
}
void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
- string path = bucket.name + "/" + key.name;
+ const std::string path = bucket.name + "/" + key.name;
auto iter = notifications.upper_bound(path);
if (iter == notifications.begin()) {
std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
tc->name = info.name;
tc->subs = user_topics.topics[info.name].subs;
+ tc->opaque_data = info.opaque_data;
(*topics)->push_back(tc);
}
PSSubscriptionRef sub;
std::array<rgw_user, 2>::const_iterator oiter;
std::vector<PSTopicConfigRef>::const_iterator titer;
- std::set<string>::const_iterator siter;
+ std::set<std::string>::const_iterator siter;
int last_sub_conf_error;
public:
// subscription was made by S3 compatible API
ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
record->configurationId = sub->sub_conf->s3_id;
+ record->opaque_data = (*titer)->opaque_data;
yield call(PSSubscription::store_event_cr(sync_env, sub, record));
if (retcode < 0) {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
{
std::vector<std::pair<std::string, std::string> > attrs;
for (auto& attr : attrs) {
- string k = attr.first;
+ std::string k = attr.first;
if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
}
RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
{
data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
- string jconf = json_str("conf", *data_handler->get_conf());
+ const std::string jconf = json_str("conf", *data_handler->get_conf());
JSONParser p;
if (!p.parse(jconf.c_str(), jconf.size())) {
ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
topic_name = s->object.name;
+ opaque_data = s->info.args.get("OpaqueData");
dest.push_endpoint = s->info.args.get("push-endpoint");
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
worker.reset_events()
verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+ def get_and_reset_events(self):
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ worker.reset_events()
+ return events
+
def close(self):
"""close all workers in the http server and wait for it to finish"""
# make sure that the shared socket is closed
http_server.close()
+def test_ps_s3_opaque_data():
+ """ test that opaque id set in topic, is sent in notification """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ zones, ps_zones = init_env(require_ps=True)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ opaque_data = 'http://1.2.3.4:8888'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+ result, status = topic_conf.set_config()
+ assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ # wait for sync
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ http_server.close()
+
+
+def test_ps_s3_opaque_data_on_master():
+ """ test that opaque id set in topic, is sent in notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ start_time = time.time()
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ http_server.close()
+
def test_ps_topic():
""" test set/get/delete of topic """
_, ps_zones = init_env()
# check amqp receiver
event_count = 0
for event in receiver.get_and_reset_events():
- assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
- assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+ s3_event = event['Records'][0]['s3']
+ assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+ assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
event_count +=1
# only PUT and POST has the metadata value
# check amqp receiver
event_count = 0
for event in receiver.get_and_reset_events():
- assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
- assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+ s3_event = event['Records'][0]['s3']
+ assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+ assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
event_count +=1
# all 3 object has metadata when deleted
class PSTopicS3:
"""class to set/list/get/delete a topic
- POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
+ POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
POST ?Action=ListTopics
POST ?Action=GetTopic&TopicArn=<topic-arn>
POST ?Action=DeleteTopic&TopicArn=<topic-arn>
"""
- def __init__(self, conn, topic_name, region, endpoint_args=None):
+ def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
self.conn = conn
self.topic_name = topic_name.strip()
assert self.topic_name
self.attributes = {}
if endpoint_args is not None:
self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
- protocol = 'https' if conn.is_secure else 'http'
- self.client = boto3.client('sns',
- endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key,
- region_name=region,
- verify='./cert.pem',
- config=Config(signature_version='s3'))
+ if opaque_data is not None:
+ self.attributes['OpaqueData'] = opaque_data
+ protocol = 'https' if conn.is_secure else 'http'
+ self.client = boto3.client('sns',
+ endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ verify='./cert.pem',
+ config=Config(signature_version='s3'))
def get_config(self):