Bucket notifications provide a mechanism for sending information out of the radosgw when certain events are happening on the bucket.
Currently, notifications could be sent to: HTTP, AMQP0.9.1 and Kafka endpoints.
-Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
+Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
the `PubSub Module`_ should be used instead of the bucket notification mechanism.
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only associate them with buckets it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
- ``pubsub_push_fail``: running counter, for all notifications, of events failed to be pushed to their endpoint
- ``pubsub_push_pending``: gauge value of events pushed to an endpoint but not acked or nacked yet
-.. note::
+.. note::
- ``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
+ ``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per push action on each notification.
Bucket Notification REST API
Upon a successful request, the response will include the topic ARN that could be later used to reference this topic in the notification request.
To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.
-.. tip:: Any notification already associated with the topic needs to be re-created for the topic update to take effect
+.. tip:: Any notification already associated with the topic needs to be re-created for the topic update to take effect
::
POST
Action=CreateTopic
&Name=<topic-name>
- &push-endpoint=<endpoint>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
[&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
[&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
[&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
+ [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
Request parameters:
- push-endpoint: URI of an endpoint to send push notification to
-- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic
-- HTTP endpoint
+- HTTP endpoint
- URI: ``http[s]://<fqdn>[:<port]``
- port defaults to: 80/443 for HTTP/S accordingly
- URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- user/password defaults to: guest/guest
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
+ - user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
- port defaults to: 5672
- vhost defaults to: "/"
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer
-- Kafka endpoint
+.. tip:: The topic-name is used for the AMQP topic ("routing key" for a topic exchange)
+
+- Kafka endpoint
- URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
- if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
- if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
+ - user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
+ - user/password may only be provided together with ``use-ssl``, if not, the connection to the broker would fail.
- port defaults to: 9092
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
-.. note::
+.. note::
- The key/value of a specific parameter does not have to reside in the same line, or in any specific order, but must use the same index
- Attribute indexing does not need to be sequential or start from any specific value
<ResponseMetadata>
<RequestId></RequestId>
</ResponseMetadata>
- </CreateTopicResponse>
+ </CreateTopicResponse>
The topic ARN in the response will have the following format:
::
arn:aws:sns:<zone-group>:<tenant>:<topic>
-
+
Get Topic Information
`````````````````````
::
POST
- Action=GetTopic&TopicArn=<topic-arn>
+
+ Action=GetTopic
+ &TopicArn=<topic-arn>
Response will have the following format:
<ResponseMetadata>
<RequestId></RequestId>
</ResponseMetadata>
- </GetTopicResponse>
+ </GetTopicResponse>
- User: name of the user that created the topic
- Name: name of the topic
-- EndPoinjtAddress: the push-endpoint URL
-- if endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
-- EndPointArgs: the push-endpoint args
+- EndpointAddress: the push-endpoint URL
+- if endpoint URL contain user/password information, request must be made over HTTPS. If not, topic get request will be rejected.
+- EndpointArgs: the push-endpoint args
- EndpointTopic: the topic name that should be sent to the endpoint (mat be different than the above topic name)
- TopicArn: topic ARN
::
POST
- Action=DeleteTopic&TopicArn=<topic-arn>
+
+ Action=DeleteTopic
+ &TopicArn=<topic-arn>
Delete the specified topic. Note that deleting a deleted topic should result with no-op and not a failure.
<ResponseMetadata>
<RequestId></RequestId>
</ResponseMetadata>
- </DeleteTopicResponse>
+ </DeleteTopicResponse>
List Topics
```````````
-List all topics that user defined.
+List all topics associated with a tenant.
::
- POST
+ POST
+
Action=ListTopics
-
+
Response will have the following format:
::
<ResponseMetadata>
<RequestId></RequestId>
</ResponseMetadata>
- </ListTopicsResponse>
+ </ListTopicsResponse>
-- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not
+- if endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. If not, topic list request will be rejected.
Notifications
~~~~~~~~~~~~~
Detailed under: `Bucket Operations`_.
-.. note::
+.. note::
- "Abort Multipart Upload" request does not emit a notification
- - "Delete Multiple Objects" request does not emit a notification
- Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
~~~~~~
The events are in JSON format (regardless of the actual endpoint), and share the same structure as the S3-compatible events
-pushed or pulled using the pubsub sync module.
+pushed or pulled using the pubsub sync module. For example:
::
- {"Records":[
+ {"Records":[
{
- "eventVersion":"2.1"
- "eventSource":"aws:s3",
- "awsRegion":"",
- "eventTime":"",
- "eventName":"",
- "userIdentity":{
- "principalId":""
+ "eventVersion":"2.1",
+ "eventSource":"ceph:s3",
+ "awsRegion":"us-east-1",
+ "eventTime":"2019-11-22T13:47:35.124724Z",
+ "eventName":"s3:ObjectCreated:Put",
+ "userIdentity":{
+ "principalId":"tester"
},
"requestParameters":{
"sourceIPAddress":""
},
"responseElements":{
- "x-amz-request-id":"",
- "x-amz-id-2":""
+ "x-amz-request-id":"503a4c37-85eb-47cd-8681-2817e80b4281.5330.903595",
+ "x-amz-id-2":"14d2-zone1-zonegroup1"
},
"s3":{
"s3SchemaVersion":"1.0",
- "configurationId":"",
+ "configurationId":"mynotif1",
"bucket":{
- "name":"",
+ "name":"mybucket1",
"ownerIdentity":{
- "principalId":""
+ "principalId":"tester"
},
- "arn":"",
- "id:""
+ "arn":"arn:aws:s3:us-east-1::mybucket1",
+ "id":"503a4c37-85eb-47cd-8681-2817e80b4281.5332.38"
},
"object":{
- "key":"",
- "size":"",
- "eTag":"",
+ "key":"myimage1.jpg",
+ "size":"1024",
+ "eTag":"37b51d194a7513e45b56f6524f2d51f2",
"versionId":"",
- "sequencer": "",
+ "sequencer": "F7E6D75DC742D108",
"metadata":[],
"tags":[]
}
},
"eventId":"",
- "opaqueData":"",
+ "opaqueData":"me@example.com"
}
]}
- eventName: for list of supported events see: `S3 Notification Compatibility`_
- userIdentity.principalId: user that triggered the change
- requestParameters.sourceIPAddress: not supported
-- responseElements.x-amz-request-id: request ID of the original change
-- responseElements.x_amz_id_2: RGW on which the change was made
+- responseElements.x-amz-request-id: request ID of the original change
+- responseElements.x_amz_id_2: RGW on which the change was made
- s3.configurationId: notification ID that created the event
- s3.bucket.name: name of the bucket
- s3.bucket.ownerIdentity.principalId: owner of the bucket
- s3.object.eTag: object etag
- s3.object.version: object version in case of versioned bucket
- s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
-- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API)
-- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
+- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API)
+- s3.object.tags: any tags set on the object (an extension to the S3 notification API)
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
-- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic (an extension to the S3 notification API)
.. _PubSub Module : ../pubsub-module
.. _S3 Notification Compatibility: ../s3-notification-compatibility
AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module.
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only subscribe to events published by buckets
-it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
In order to publish events for specific bucket a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
together, although it is recommended to use the S3-compatible one.
The S3-compatible API is similar to the one used in the bucket notification mechanism.
-Events are stored as RGW objects in a special bucket, under a special user. Events cannot
+Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
be accessed directly, but need to be pulled and acked using the new REST API.
.. toctree::
PubSub Zone Configuration
-------------------------
-The pubsub sync module requires the creation of a new zone in a `Multisite`_ environment.
+The pubsub sync module requires the creation of a new zone in a :ref:`multisite` environment...
First, a master zone must exist (see: :ref:`master-zone-label`),
then a secondary zone should be created (see :ref:`secondary-zone-label`).
In the creation of the secondary zone, its tier type must be set to ``pubsub``:
- user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- port defaults to: 5672
- vhost defaults to: "/"
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
+ - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer
+.. tip:: The topic-name is used for the AMQP topic ("routing key" for a topic exchange)
+
- Kafka endpoint
- URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
List Topics
```````````
-List all topics that user defined.
+List all topics associated with a tenant.
::
- event-id: id of event to be acked
-.. _Multisite : ../multisite
.. _Bucket Notification : ../notifications
.. _Bucket Operations: ../s3/bucketops
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+ RGWPubSub ps(store, tenant);
rgw_bucket bucket;
return -ret;
}
- auto b = ups.get_bucket(bucket_info.bucket);
+ auto b = ps.get_bucket(bucket_info.bucket);
ret = b->get_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
}
encode_json("result", result, formatter);
} else {
- rgw_pubsub_user_topics result;
- int ret = ups.get_user_topics(&result);
+ rgw_pubsub_topics result;
+ int ret = ps.get_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+ RGWPubSub ps(store, tenant);
- ret = ups.create_topic(topic_name);
+ ret = ps.create_topic(topic_name);
if (ret < 0) {
cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
rgw_pubsub_topic_subs topic;
- ret = ups.get_topic(topic_name, &topic);
+ ret = ps.get_topic(topic_name, &topic);
if (ret < 0) {
cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (bucket_name.empty()) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
rgw_bucket bucket;
return -ret;
}
- auto b = ups.get_bucket(bucket_info.bucket);
+ auto b = ps.get_bucket(bucket_info.bucket);
ret = b->create_notification(topic_name, event_types);
if (ret < 0) {
cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (bucket_name.empty()) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+ RGWPubSub ups(store, tenant);
rgw_bucket bucket;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- ret = ups.remove_topic(topic_name);
+ RGWPubSub ps(store, tenant);
+
+ ret = ps.remove_topic(topic_name);
if (ret < 0) {
cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
rgw_pubsub_sub_config sub_conf;
- auto sub = ups.get_sub(sub_name);
+ auto sub = ps.get_sub(sub_name);
ret = sub->get_conf(&sub_conf);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
return EINVAL;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+ RGWPubSub ps(store, tenant);
rgw_pubsub_topic_subs topic;
- int ret = ups.get_topic(topic_name, &topic);
+ int ret = ps.get_topic(topic_name, &topic);
if (ret < 0) {
cerr << "ERROR: topic not found" << std::endl;
return EINVAL;
auto conf = psmodule->get_effective_conf();
if (dest_config.bucket_name.empty()) {
- dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
+ dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_id.to_str() + "-" + topic.topic.name;
}
if (dest_config.oid_prefix.empty()) {
dest_config.oid_prefix = conf["data_oid_prefix"];
}
- auto sub = ups.get_sub(sub_name);
+ auto sub = ps.get_sub(sub_name);
ret = sub->subscribe(topic_name, dest_config);
if (ret < 0) {
cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- auto sub = ups.get_sub(sub_name);
+ RGWPubSub ps(store, tenant);
+
+ auto sub = ps.get_sub(sub_name);
ret = sub->unsubscribe(topic_name);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
if (!max_entries_specified) {
- max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
+ max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
}
- auto sub = ups.get_sub(sub_name);
+ auto sub = ps.get_sub_with_events(sub_name);
ret = sub->list_events(marker, max_entries);
if (ret < 0) {
cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
return EINVAL;
cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- auto sub = ups.get_sub(sub_name);
+ RGWPubSub ps(store, tenant);
+
+ auto sub = ps.get_sub_with_events(sub_name);
ret = sub->remove_event(event_id);
if (ret < 0) {
cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
const std::string& etag,
EventType event_type,
rgw::sal::RGWRadosStore* store) {
- RGWUserPubSub ps_user(store, s->user->get_id());
- RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
+ RGWPubSub ps_user(store, s->user->get_id().tenant);
+ RGWPubSub::Bucket ps_bucket(&ps_user, s->bucket);
rgw_pubsub_bucket_topics bucket_topics;
auto rc = ps_bucket.get_topics(&bucket_topics);
if (rc < 0) {
}
}
-void rgw_pubsub_user_topics::dump(Formatter *f) const
+void rgw_pubsub_topics::dump(Formatter *f) const
{
Formatter::ArraySection s(*f, "topics");
for (auto& t : topics) {
}
}
-void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+void rgw_pubsub_topics::dump_xml(Formatter *f) const
{
for (auto& t : topics) {
encode_xml("member", t.second.topic, f);
encode_json("s3_id", s3_id, f);
}
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) :
+RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) :
store(_store),
- user(_user),
+ tenant(_tenant),
obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
- get_user_meta_obj(&user_meta_obj);
+ get_meta_obj(&meta_obj);
}
-int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::remove(const rgw_raw_obj& obj,
+ RGWObjVersionTracker *objv_tracker)
{
int ret = rgw_delete_system_obj(store->svc()->sysobj, obj.pool, obj.oid, objv_tracker);
if (ret < 0) {
return 0;
}
-int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
{
- int ret = read(user_meta_obj, result, objv_tracker);
+ int ret = read(meta_obj, result, objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::write_topics(const rgw_pubsub_topics& topics,
+ RGWObjVersionTracker *objv_tracker)
{
- int ret = write(user_meta_obj, topics, objv_tracker);
+ int ret = write(meta_obj, topics, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
+int RGWPubSub::get_topics(rgw_pubsub_topics *result)
{
- return read_user_topics(result, nullptr);
+ return read_topics(result, nullptr);
}
-int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
{
int ret = ps->read(bucket_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
return 0;
}
-int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
+ RGWObjVersionTracker *objv_tracker)
{
int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
if (ret < 0) {
return 0;
}
-int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
+int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
{
return read_topics(result, nullptr);
}
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
{
- rgw_pubsub_user_topics topics;
- int ret = get_user_topics(&topics);
+ rgw_pubsub_topics topics;
+ int ret = get_topics(&topics);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
{
- rgw_pubsub_user_topics topics;
- int ret = get_user_topics(&topics);
+ rgw_pubsub_topics topics;
+ int ret = get_topics(&topics);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
- return create_notification(topic_name, events, std::nullopt, "");
+int RGWPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
+ return create_notification(topic_name, events, std::nullopt, "");
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
- rgw_pubsub_topic_subs user_topic_info;
+int RGWPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
+ rgw_pubsub_topic_subs topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->get_topic(topic_name, &user_topic_info);
+ int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
bucket.name << "'" << dendl;
auto& topic_filter = bucket_topics.topics[topic_name];
- topic_filter.topic = user_topic_info.topic;
+ topic_filter.topic = topic_info.topic;
topic_filter.events = events;
topic_filter.s3_id = notif_name;
if (s3_filter) {
return 0;
}
-int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
+int RGWPubSub::Bucket::remove_notification(const string& topic_name)
{
- rgw_pubsub_topic_subs user_topic_info;
+ rgw_pubsub_topic_subs topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->get_topic(topic_name, &user_topic_info);
+ int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::create_topic(const string& name) {
+int RGWPubSub::create_topic(const string& name) {
return create_topic(name, rgw_pubsub_sub_dest(), "", "");
}
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
+int RGWPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = read_user_topics(&topics, &objv_tracker);
+ int ret = read_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
// its not an error if not topics exist, we create one
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
}
rgw_pubsub_topic_subs& new_topic = topics.topics[name];
- new_topic.topic.user = user;
+ new_topic.topic.user = rgw_user("", tenant);
new_topic.topic.name = name;
new_topic.topic.dest = dest;
new_topic.topic.arn = arn;
new_topic.topic.opaque_data = opaque_data;
- ret = write_user_topics(topics, &objv_tracker);
+ ret = write_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::remove_topic(const string& name)
+int RGWPubSub::remove_topic(const string& name)
{
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = read_user_topics(&topics, &objv_tracker);
+ int ret = read_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
topics.topics.erase(name);
- ret = write_user_topics(topics, &objv_tracker);
+ ret = write_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
{
int ret = ps->read(sub_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
return 0;
}
-int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
+ RGWObjVersionTracker *objv_tracker)
{
int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
if (ret < 0) {
return 0;
}
-int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
{
int ret = ps->remove(sub_meta_obj, objv_tracker);
if (ret < 0) {
return 0;
}
-int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
+int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
{
return read_sub(result, nullptr);
}
-int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
+int RGWPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
{
- RGWObjVersionTracker user_objv_tracker;
- rgw_pubsub_user_topics topics;
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_topics topics;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->read_user_topics(&topics, &user_objv_tracker);
+ int ret = ps->read_topics(&topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret != -ENOENT ? ret : -EINVAL;
rgw_pubsub_sub_config sub_conf;
- sub_conf.user = ps->user;
+ sub_conf.user = rgw_user("", ps->tenant);
sub_conf.name = sub;
sub_conf.topic = topic;
sub_conf.dest = dest;
t.subs.insert(sub);
- ret = ps->write_user_topics(topics, &user_objv_tracker);
+ ret = ps->write_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
+int RGWPubSub::Sub::unsubscribe(const string& _topic)
{
string topic = _topic;
RGWObjVersionTracker sobjv_tracker;
}
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = ps->read_user_topics(&topics, &objv_tracker);
+ int ret = ps->read_topics(&topics, &objv_tracker);
if (ret < 0) {
// not an error - could be that topic was already deleted
ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
t.subs.erase(sub);
- ret = ps->write_user_topics(topics, &objv_tracker);
+ ret = ps->write_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
+void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
{
encode_json("next_marker", next_marker, f);
encode_json("is_truncated", is_truncated, f);
}
template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
+int RGWPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
{
RGWRados *store = ps->store->getRados();
rgw_pubsub_sub_config sub_conf;
}
template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
+int RGWPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
{
rgw::sal::RGWRadosStore *store = ps->store;
rgw_pubsub_sub_config sub_conf;
return 0;
}
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
}
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
}
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
}
template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
+void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
list.dump(f);
}
// explicit instantiation for the only two possible types
// no need to move implementation to header
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
}
string to_str() const {
- return user.to_str() + "/" + name;
+ return user.tenant + "/" + name;
}
void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
-struct rgw_pubsub_user_topics {
+struct rgw_pubsub_topics {
std::map<std::string, rgw_pubsub_topic_subs> topics;
void encode(bufferlist& bl) const {
void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+WRITE_CLASS_ENCODER(rgw_pubsub_topics)
-static std::string pubsub_user_oid_prefix = "pubsub.user.";
+static std::string pubsub_oid_prefix = "pubsub.";
-class RGWUserPubSub
+class RGWPubSub
{
friend class Bucket;
rgw::sal::RGWRadosStore *store;
- rgw_user user;
+ const std::string tenant;
RGWSysObjectCtx obj_ctx;
- rgw_raw_obj user_meta_obj;
+ rgw_raw_obj meta_obj;
- std::string user_meta_oid() const {
- return pubsub_user_oid_prefix + user.to_str();
+ std::string meta_oid() const {
+ return pubsub_oid_prefix + tenant;
}
std::string bucket_meta_oid(const rgw_bucket& bucket) const {
- return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+ return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id;
}
std::string sub_meta_oid(const string& name) const {
- return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+ return pubsub_oid_prefix + tenant + ".sub." + name;
}
template <class T>
- int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+ int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
template <class T>
- int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
+ int write(const rgw_raw_obj& obj, const T& info,
+ RGWObjVersionTracker* obj_tracker);
- int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+ int remove(const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker);
- int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
- int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
+ int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+ int write_topics(const rgw_pubsub_topics& topics,
+ RGWObjVersionTracker* objv_tracker);
public:
- RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user);
+ RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant);
class Bucket {
- friend class RGWUserPubSub;
- RGWUserPubSub *ps;
+ friend class RGWPubSub;
+ RGWPubSub *ps;
rgw_bucket bucket;
rgw_raw_obj bucket_meta_obj;
// read the list of topics associated with a bucket and populate into result
// use version tacker to enforce atomicity between read/write
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+ int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
// set the list of topics associated with a bucket
// use version tacker to enforce atomicity between read/write
// return 0 on success, error code otherwise
- int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
+ int write_topics(const rgw_pubsub_bucket_topics& topics,
+ RGWObjVersionTracker* objv_tracker);
public:
- Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+ Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
}
// base class for subscription
class Sub {
- friend class RGWUserPubSub;
+ friend class RGWPubSub;
protected:
- RGWUserPubSub* const ps;
+ RGWPubSub* const ps;
const std::string sub;
rgw_raw_obj sub_meta_obj;
- int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
- int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
- int remove_sub(RGWObjVersionTracker *objv_tracker);
+ int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
+ int write_sub(const rgw_pubsub_sub_config& sub_conf,
+ RGWObjVersionTracker* objv_tracker);
+ int remove_sub(RGWObjVersionTracker* objv_tracker);
public:
- Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
+ Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
ps->get_sub_meta_obj(sub, &sub_meta_obj);
}
} list;
public:
- SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
+ SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
virtual ~SubWithEvents() = default;
return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
}
- void get_user_meta_obj(rgw_raw_obj *obj) const;
+ void get_meta_obj(rgw_raw_obj *obj) const;
void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
- // get all topics defined for the user and populate them into "result"
+ // get all topics (per tenant, if used)) and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
- int get_user_topics(rgw_pubsub_user_topics *result);
+ int get_topics(rgw_pubsub_topics *result);
// get a topic with its subscriptions by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
{
bufferlist bl;
int ret = rgw_get_system_obj(obj_ctx,
}
template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::write(const rgw_raw_obj& obj, const T& info,
+ RGWObjVersionTracker* objv_tracker)
{
bufferlist bl;
encode(info, bl);
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
std::string data_bucket_prefix = "";
std::string data_oid_prefix = "";
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ups->get_topic(topic_name, &topic_info);
+ op_ret = ps->get_topic(topic_name, &topic_info);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// generate the internal topic. destination is stored here for the "push-only" case
// when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
+ op_ret = ps->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
// rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name);
+ ps->remove_topic(unique_topic_name);
return;
}
ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
rgw_pubsub_sub_dest dest = topic_info.dest;
dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
dest.oid_prefix = data_oid_prefix + notif_name + "/";
- auto sub = ups->get_sub(notif_name);
+ auto sub = ps->get_sub(notif_name);
op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
// rollback generated notification (ignore return value)
b->remove_notification(unique_topic_name);
// rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name);
+ ps->remove_topic(unique_topic_name);
return;
}
ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
return 0;
}
- void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
+ void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b) {
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
}
- op_ret = ups->remove_topic(topic_name);
+ op_ret = ps->remove_topic(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
}
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
// get all topics on a bucket
if (unique_topic) {
// remove the auto generated subscription according to notification name (if exist)
const auto unique_topic_name = unique_topic->get().topic.name;
- auto sub = ups->get_sub(notif_name);
+ auto sub = ps->get_sub(notif_name);
op_ret = sub->unsubscribe(unique_topic_name);
if (op_ret < 0 && op_ret != -ENOENT) {
ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
for (const auto& topic : bucket_topics.topics) {
// remove the auto generated subscription of the topic (if exist)
rgw_pubsub_topic_subs topic_subs;
- op_ret = ups->get_topic(topic.first, &topic_subs);
+ op_ret = ps->get_topic(topic.first, &topic_subs);
for (const auto& topic_sub_name : topic_subs.subs) {
- auto sub = ups->get_sub(topic_sub_name);
+ auto sub = ps->get_sub(topic_sub_name);
rgw_pubsub_sub_config sub_conf;
op_ret = sub->get_conf(&sub_conf);
if (op_ret < 0) {
};
void RGWPSListNotifs_ObjStore_S3::execute() {
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
// get all topics on a bucket
return topic.topic.dest.stored_secret;
}
-bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
for (const auto& topic : topics.topics) {
if (topic_has_endpoint_secret(topic.second)) return true;
}
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->create_topic(topic_name, dest, topic_arn, opaque_data);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
void RGWPSListTopicsOp::execute() {
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->get_user_topics(&result);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->get_topics(&result);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->get_topic(topic_name, &result);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->get_topic(topic_name, &result);
if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
op_ret = -EPERM;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->remove_topic(topic_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->remove_topic(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
return;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->subscribe(topic_name, dest);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->get_conf(&result);
if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->unsubscribe(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub_with_events(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub_with_events(sub_name);
op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- sub = ups->get_sub_with_events(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ sub = ps->get_sub_with_events(sub_name);
if (!sub) {
op_ret = -ENOENT;
ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
// create a topic
class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
std::string topic_name;
rgw_pubsub_sub_dest dest;
std::string topic_arn;
// list all topics
class RGWPSListTopicsOp : public RGWOp {
protected:
- std::optional<RGWUserPubSub> ups;
- rgw_pubsub_user_topics result;
+ std::optional<RGWPubSub> ps;
+ rgw_pubsub_topics result;
public:
int verify_permission() override {
class RGWPSGetTopicOp : public RGWOp {
protected:
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_topic_subs result;
virtual int get_params() = 0;
class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
protected:
string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_sub_dest dest;
virtual int get_params() = 0;
class RGWPSGetSubOp : public RGWOp {
protected:
std::string sub_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_sub_config result;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string event_id;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
int max_entries{0};
std::string sub_name;
std::string marker;
- std::optional<RGWUserPubSub> ups;
- RGWUserPubSub::SubRef sub;
+ std::optional<RGWPubSub> ps;
+ RGWPubSub::SubRef sub;
virtual int get_params() = 0;
// notification creation
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
string bucket_name;
RGWBucketInfo bucket_info;
// delete a notification
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
std::string bucket_name;
RGWBucketInfo bucket_info;
protected:
std::string bucket_name;
RGWBucketInfo bucket_info;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
} else {
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
yield {
- RGWUserPubSub ups(sync_env->store, owner);
+ RGWPubSub ps(sync_env->store, owner.tenant);
rgw_raw_obj obj;
- ups.get_sub_meta_obj(sub_name, &obj);
+ ps.get_sub_meta_obj(sub_name, &obj);
bool empty_on_enoent = false;
call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
obj,
rgw_obj_key key;
rgw::notify::EventType event_type;
- RGWUserPubSub ups;
+ RGWPubSub ps;
rgw_raw_obj bucket_obj;
rgw_raw_obj user_obj;
rgw_pubsub_bucket_topics bucket_topics;
- rgw_pubsub_user_topics user_topics;
+ rgw_pubsub_topics user_topics;
TopicsRef *topics;
public:
RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
bucket(_bucket),
key(_key),
event_type(_event_type),
- ups(sync_env->store, owner),
+ ps(sync_env->store, owner.tenant),
topics(_topics) {
*topics = std::make_shared<vector<PSTopicConfigRef> >();
}
int operate() override {
reenter(this) {
- ups.get_bucket_meta_obj(bucket, &bucket_obj);
- ups.get_user_meta_obj(&user_obj);
+ ps.get_bucket_meta_obj(bucket, &bucket_obj);
+ ps.get_meta_obj(&user_obj);
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
yield {
ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
if (!bucket_topics.topics.empty()) {
- using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
yield {
bool empty_on_enoent = true;
call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
sub_name = s->object.name;
marker = s->info.args.get("marker");
const int ret = s->info.args.get_int("max-entries", &max_entries,
- RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
+ RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
if (ret < 0) {
ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
void RGWPSCreateNotif_ObjStore::execute()
{
- ups.emplace(store, s->owner.get_id());
+ ps.emplace(store, s->owner.get_id().tenant);
- auto b = ups->get_bucket(bucket_info.bucket);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
void RGWPSListNotifs_ObjStore::execute()
{
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->get_topics(&result);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
client_threads = []
objects_size = {}
for i in range(number_of_objects):
- object_size = randint(1, 1024)
- content = str(os.urandom(object_size))
+ content = str(os.urandom(randint(1, 1024)))
key = bucket.new_key(str(i))
- objects_size[key.name] = object_size
+ objects_size[key.name] = len(content)
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
objects_size = {}
start_time = time.time()
for i in range(number_of_objects):
- object_size = randint(1, 1024)
- content = str(os.urandom(object_size))
+ content = str(os.urandom(randint(1, 1024)))
key = bucket.new_key(str(i))
- objects_size[key.name] = object_size
+ objects_size[key.name] = len(content)
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
# create objects in the bucket using COPY
bucket.copy_key('copy_of_foo', bucket.name, key.name)
+
# create objects in the bucket using multi-part upload
- fp = tempfile.NamedTemporaryFile(mode='w')
- fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
- fp.close()
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ object_size = 1024
+ content = bytearray(os.urandom(object_size))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
uploader = bucket.initiate_multipart_upload('multipart_foo',
metadata={meta_key: meta_value})
- fp = tempfile.TemporaryFile(mode='r')
uploader.upload_part_from_file(fp, 1)
uploader.complete_upload()
fp.close()
+
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver