+++ /dev/null
-==================
-PubSub Sync Module
-==================
-
-.. versionadded:: Nautilus
-
-.. contents::
-
-This sync module provides a publish and subscribe mechanism for the object store modification
-events. Events are published into predefined topics. Topics can be subscribed to, and events
-can be pulled from them. Events need to be acked. Also, events will expire and disappear
-after a period of time.
-
-A push notification mechanism exists too, currently supporting HTTP,
-AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
-and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module.
-
-A user can create different topics. A topic entity is defined by its name and is per tenant. A
-user can only associate its topics (via notification configuration) with buckets it owns.
-
-In order to publish events for specific bucket a notification entity needs to be created. A
-notification can be created on a subset of event types, or for all event types (default).
-There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
-
-A subscription to a topic can also be defined. There can be multiple subscriptions for any
-specific topic.
-
-REST API has been defined to provide configuration and control interfaces for the pubsub
-mechanisms. This API has two flavors, one is S3-compatible and one is not. The two flavors can be used
-together, although it is recommended to use the S3-compatible one.
-The S3-compatible API is similar to the one used in the bucket notification mechanism.
-
-Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
-be accessed directly, but need to be pulled and acked using the new REST API.
-
-.. toctree::
- :maxdepth: 1
-
- S3 Bucket Notification Compatibility <s3-notification-compatibility>
-
-.. note:: To enable bucket notifications API, the `rgw_enable_apis` configuration parameter should contain: "notifications".
-
-PubSub Zone Configuration
--------------------------
-
-The pubsub sync module requires the creation of a new zone in a :ref:`multisite` environment...
-First, a master zone must exist (see: :ref:`master-zone-label`),
-then a secondary zone should be created (see :ref:`secondary-zone-label`).
-In the creation of the secondary zone, its tier type must be set to ``pubsub``:
-
-::
-
- # radosgw-admin zone create --rgw-zonegroup={zone-group-name} \
- --rgw-zone={zone-name} \
- --endpoints={http://fqdn}[,{http://fqdn}] \
- --sync-from-all=0 \
- --sync-from={master-zone-name} \
- --tier-type=pubsub
-
-
-PubSub Zone Configuration Parameters
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-::
-
- {
- "tenant": <tenant>, # default: <empty>
- "uid": <uid>, # default: "pubsub"
- "data_bucket_prefix": <prefix> # default: "pubsub-"
- "data_oid_prefix": <prefix> #
- "events_retention_days": <days> # default: 7
- }
-
-* ``tenant`` (string)
-
-The tenant of the pubsub control user.
-
-* ``uid`` (string)
-
-The uid of the pubsub control user.
-
-* ``data_bucket_prefix`` (string)
-
-The prefix of the bucket name that will be created to store events for specific topic.
-
-* ``data_oid_prefix`` (string)
-
-The oid prefix for the stored events.
-
-* ``events_retention_days`` (integer)
-
-How many days to keep events that weren't acked.
-
-Configuring Parameters via CLI
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The tier configuration could be set using the following command:
-
-::
-
- # radosgw-admin zone modify --rgw-zonegroup={zone-group-name} \
- --rgw-zone={zone-name} \
- --tier-config={key}={val}[,{key}={val}]
-
-Where the ``key`` in the configuration specifies the configuration variable that needs to be updated (from the list above), and
-the ``val`` specifies its new value. For example, setting the pubsub control user ``uid`` to ``user_ps``:
-
-::
-
- # radosgw-admin zone modify --rgw-zonegroup={zone-group-name} \
- --rgw-zone={zone-name} \
- --tier-config=uid=pubsub
-
-A configuration field can be removed by using ``--tier-config-rm={key}``.
-
-
-Topic and Subscription Management via CLI
------------------------------------------
-
-Configuration of all topics, associated with a tenant, could be fetched using the following command:
-
-::
-
- # radosgw-admin topic list [--tenant={tenant}]
-
-
-Configuration of a specific topic could be fetched using:
-
-::
-
- # radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
-
-
-And removed using:
-
-::
-
- # radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
-
-
-Configuration of a subscription could be fetched using:
-
-::
-
- # radosgw-admin subscription get --subscription={topic-name} [--tenant={tenant}]
-
-And removed using:
-
-::
-
- # radosgw-admin subscription rm --subscription={topic-name} [--tenant={tenant}]
-
-
-To fetch all of the events stored in a subscription, use:
-
-::
-
- # radosgw-admin subscription pull --subscription={topic-name} [--marker={last-marker}] [--tenant={tenant}]
-
-
-To ack (and remove) an event from a subscription, use:
-
-::
-
- # radosgw-admin subscription ack --subscription={topic-name} --event-id={event-id} [--tenant={tenant}]
-
-
-PubSub Performance Stats
--------------------------
-Same counters are shared between the pubsub sync module and the notification mechanism.
-
-- ``pubsub_event_triggered``: running counter of events with at lease one topic associated with them
-- ``pubsub_event_lost``: running counter of events that had topics and subscriptions associated with them but that were not stored or pushed to any of the subscriptions
-- ``pubsub_store_ok``: running counter, for all subscriptions, of stored events
-- ``pubsub_store_fail``: running counter, for all subscriptions, of events failed to be stored
-- ``pubsub_push_ok``: running counter, for all subscriptions, of events successfully pushed to their endpoint
-- ``pubsub_push_fail``: running counter, for all subscriptions, of events failed to be pushed to their endpoint
-- ``pubsub_push_pending``: gauge value of events pushed to an endpoint but not acked or nacked yet
-
-.. note::
-
- ``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
- ``pubsub_store_ok``, ``pubsub_store_fail``, ``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per store/push action on each subscriptions.
-
-PubSub REST API
----------------
-
-.. tip:: PubSub REST calls, and only them, should be sent to an RGW which belong to a PubSub zone
-
-Topics
-~~~~~~
-
-.. _radosgw-create-a-topic:
-
-Create a Topic
-``````````````
-
-This will create a new topic. Topic creation is needed both for both flavors of the API.
-Optionally the topic could be provided with push endpoint parameters that would be used later
-when an S3-compatible notification is created.
-Upon successful request, the response will include the topic ARN that could be later used to reference this topic in an S3-compatible notification request.
-To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.
-
-.. tip:: Any S3-compatible notification already associated with the topic needs to be re-created for the topic update to take effect
-
-::
-
- PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
-
-Request parameters:
-
-- push-endpoint: URI of an endpoint to send push notification to
-- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic
-
-The endpoint URI may include parameters depending with the type of endpoint:
-
-- HTTP endpoint
-
- - URI: ``http[s]://<fqdn>[:<port]``
- - port defaults to: 80/443 for HTTP/S accordingly
- - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
-
-- AMQP0.9.1 endpoint
-
- - URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to: guest/guest
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
- - vhost defaults to: "/"
- - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
- - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
-
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker (default)
- - "routable": message is considered "delivered" if broker can route to a consumer
-
-.. tip:: The topic-name (see :ref:`radosgw-create-a-topic`) is used for the AMQP topic ("routing key" for a topic exchange)
-
-- Kafka endpoint
-
- - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
- - if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
- - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
- - port defaults to: 9092
- - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
-
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker (default)
-
-The topic ARN in the response will have the following format:
-
-::
-
- arn:aws:sns:<zone-group>:<tenant>:<topic>
-
-Get Topic Information
-`````````````````````
-
-Returns information about specific topic. This includes subscriptions to that topic, and push-endpoint information, if provided.
-
-::
-
- GET /topics/<topic-name>
-
-Response will have the following format (JSON):
-
-::
-
- {
- "topic":{
- "user":"",
- "name":"",
- "dest":{
- "bucket_name":"",
- "oid_prefix":"",
- "push_endpoint":"",
- "push_endpoint_args":"",
- "push_endpoint_topic":"",
- "stored_secret":false,
- "persistent":true,
- },
- "arn":""
- "opaqueData":""
- },
- "subs":[]
- }
-
-- topic.user: name of the user that created the topic
-- name: name of the topic
-- dest.bucket_name: not used
-- dest.oid_prefix: not used
-- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
-- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
-- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
-- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
-- topic.arn: topic ARN
-- subs: list of subscriptions associated with this topic
-
-Delete Topic
-````````````
-
-::
-
- DELETE /topics/<topic-name>
-
-Delete the specified topic.
-
-List Topics
-```````````
-
-List all topics associated with a tenant.
-
-::
-
- GET /topics
-
-- if push-endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not
-
-S3-Compliant Notifications
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Detailed under: `Bucket Operations`_.
-
-.. note::
-
- - Notification creation will also create a subscription for pushing/pulling events
- - The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API.
- - Notification deletion will deletes all generated subscriptions
- - In case that bucket deletion implicitly deletes the notification,
- the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
- and will have to be deleted explicitly with the subscription deletion API
- - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
- - Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored
-
-
-Non S3-Compliant Notifications
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Create a Notification
-`````````````````````
-
-This will create a publisher for a specific bucket into a topic.
-
-::
-
- PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
-
-Request parameters:
-
-- topic-name: name of topic
-- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
-
-Delete Notification Information
-```````````````````````````````
-
-Delete publisher from a specific bucket into a specific topic.
-
-::
-
- DELETE /notifications/bucket/<bucket>?topic=<topic-name>
-
-Request parameters:
-
-- topic-name: name of topic
-
-.. note:: When the bucket is deleted, any notification defined on it is also deleted
-
-List Notifications
-``````````````````
-
-List all topics with associated events defined on a bucket.
-
-::
-
- GET /notifications/bucket/<bucket>
-
-Response will have the following format (JSON):
-
-::
-
- {"topics":[
- {
- "topic":{
- "user":"",
- "name":"",
- "dest":{
- "bucket_name":"",
- "oid_prefix":"",
- "push_endpoint":"",
- "push_endpoint_args":"",
- "push_endpoint_topic":""
- }
- "arn":""
- },
- "events":[]
- }
- ]}
-
-Subscriptions
-~~~~~~~~~~~~~
-
-Create a Subscription
-`````````````````````
-
-Creates a new subscription.
-
-::
-
- PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
-
-Request parameters:
-
-- topic-name: name of topic
-- push-endpoint: URI of endpoint to send push notification to
-
-The endpoint URI may include parameters depending with the type of endpoint:
-
-- HTTP endpoint
-
- - URI: ``http[s]://<fqdn>[:<port]``
- - port defaults to: 80/443 for HTTP/S accordingly
- - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
-
-- AMQP0.9.1 endpoint
-
- - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- - user/password defaults to : guest/guest
- - port defaults to: 5672
- - vhost defaults to: "/"
- - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
-
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker (default)
- - "routable": message is considered "delivered" if broker can route to a consumer
-
-- Kafka endpoint
-
- - URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
- - if ``ca-location`` is provided, secure connection will be used for connection with the broker
- - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - user/password may only be provided together with ``ca-location``. Topic creation request will be rejected if not
- - port defaults to: 9092
- - kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
-
- - "none": message is considered "delivered" if sent to broker
- - "broker": message is considered "delivered" if acked by broker (default)
-
-
-Get Subscription Information
-````````````````````````````
-
-Returns information about specific subscription.
-
-::
-
- GET /subscriptions/<sub-name>
-
-Response will have the following format (JSON):
-
-::
-
- {
- "user":"",
- "name":"",
- "topic":"",
- "dest":{
- "bucket_name":"",
- "oid_prefix":"",
- "push_endpoint":"",
- "push_endpoint_args":"",
- "push_endpoint_topic":""
- }
- "s3_id":""
- }
-
-- user: name of the user that created the subscription
-- name: name of the subscription
-- topic: name of the topic the subscription is associated with
-- dest.bucket_name: name of the bucket storing the events
-- dest.oid_prefix: oid prefix for the events stored in the bucket
-- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
-- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
-- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
-- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
-- s3_id: in case of S3-compliant notifications, this will hold the notification name that created the subscription
-
-Delete Subscription
-```````````````````
-
-Removes a subscription.
-
-::
-
- DELETE /subscriptions/<sub-name>
-
-Events
-~~~~~~
-
-Pull Events
-```````````
-
-Pull events sent to a specific subscription.
-
-::
-
- GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
-
-Request parameters:
-
-- marker: pagination marker for list of events, if not specified will start from the oldest
-- max-entries: max number of events to return
-
-The response will hold information on the current marker and whether there are more events not fetched:
-
-::
-
- {"next_marker":"","is_truncated":false,...}
-
-
-The actual content of the response is depended with how the subscription was created.
-In case that the subscription was created via an S3-compatible notification,
-the events will have an S3-compatible record format (JSON):
-
-::
-
- {"Records":[
- {
- "eventVersion":"2.1"
- "eventSource":"aws:s3",
- "awsRegion":"",
- "eventTime":"",
- "eventName":"",
- "userIdentity":{
- "principalId":""
- },
- "requestParameters":{
- "sourceIPAddress":""
- },
- "responseElements":{
- "x-amz-request-id":"",
- "x-amz-id-2":""
- },
- "s3":{
- "s3SchemaVersion":"1.0",
- "configurationId":"",
- "bucket":{
- "name":"",
- "ownerIdentity":{
- "principalId":""
- },
- "arn":"",
- "id":""
- },
- "object":{
- "key":"",
- "size":"0",
- "eTag":"",
- "versionId":"",
- "sequencer":"",
- "metadata":[],
- "tags":[]
- }
- },
- "eventId":"",
- "opaqueData":"",
- }
- ]}
-
-- awsRegion: zonegroup
-- eventTime: timestamp indicating when the event was triggered
-- eventName: either ``s3:ObjectCreated:``, or ``s3:ObjectRemoved:``
-- userIdentity: not supported
-- requestParameters: not supported
-- responseElements: not supported
-- s3.configurationId: notification ID that created the subscription for the event
-- s3.bucket.name: name of the bucket
-- s3.bucket.ownerIdentity.principalId: owner of the bucket
-- s3.bucket.arn: ARN of the bucket
-- s3.bucket.id: Id of the bucket (an extension to the S3 notification API)
-- s3.object.key: object key
-- s3.object.size: not supported
-- s3.object.eTag: object etag
-- s3.object.version: object version in case of versioned bucket
-- s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
-- s3.object.metadata: not supported (an extension to the S3 notification API)
-- s3.object.tags: not supported (an extension to the S3 notification API)
-- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
-- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic (an extension to the S3 notification API)
-
-In case that the subscription was not created via a non S3-compatible notification,
-the events will have the following event format (JSON):
-
-::
-
- {"events":[
- {
- "id":"",
- "event":"",
- "timestamp":"",
- "info":{
- "attrs":{
- "mtime":""
- },
- "bucket":{
- "bucket_id":"",
- "name":"",
- "tenant":""
- },
- "key":{
- "instance":"",
- "name":""
- }
- }
- }
- ]}
-
-- id: unique ID of the event, that could be used for acking
-- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
-- timestamp: timestamp indicating when the event was sent
-- info.attrs.mtime: timestamp indicating when the event was triggered
-- info.bucket.bucket_id: id of the bucket
-- info.bucket.name: name of the bucket
-- info.bucket.tenant: tenant the bucket belongs to
-- info.key.instance: object version in case of versioned bucket
-- info.key.name: object key
-
-Ack Event
-`````````
-
-Ack event so that it can be removed from the subscription history.
-
-::
-
- POST /subscriptions/<sub-name>?ack&event-id=<event-id>
-
-Request parameters:
-
-- event-id: id of event to be acked
-
-.. _Bucket Notification : ../notifications
-.. _Bucket Operations: ../s3/bucketops
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab ft=cpp
-
-#include <algorithm>
-#include "rgw_rest_pubsub_common.h"
-#include "rgw_rest_pubsub.h"
-#include "rgw_sync_module_pubsub.h"
-#include "rgw_pubsub_push.h"
-#include "rgw_sync_module_pubsub_rest.h"
-#include "rgw_pubsub.h"
-#include "rgw_op.h"
-#include "rgw_rest.h"
-#include "rgw_rest_s3.h"
-#include "rgw_arn.h"
-#include "rgw_zone.h"
-#include "services/svc_zone.h"
-#include "rgw_sal_rados.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
-
-using namespace std;
-
-// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
-public:
- int get_params() override {
-
- topic_name = s->object->get_name();
-
- opaque_data = s->info.args.get("OpaqueData");
- dest.push_endpoint = s->info.args.get("push-endpoint");
-
- if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
- return -EINVAL;
- }
- dest.push_endpoint_args = s->info.args.get_str();
- // dest object only stores endpoint info
- // bucket to store events/records will be set only when subscription is created
- dest.bucket_name = "";
- dest.oid_prefix = "";
- dest.arn_topic = topic_name;
- // the topic ARN will be sent in the reply
- const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
- store->get_zone()->get_zonegroup().get_name(),
- s->user->get_tenant(), topic_name);
- topic_arn = arn.to_string();
- return 0;
- }
-
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
-
- {
- Formatter::ObjectSection section(*s->formatter, "result");
- encode_json("arn", topic_arn, s->formatter);
- }
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
-};
-
-// command: GET /topics
-class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
-public:
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
-
- encode_json("result", result, s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
-};
-
-// command: GET /topics/<topic-name>
-class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
-public:
- int get_params() override {
- topic_name = s->object->get_name();
- return 0;
- }
-
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
-
- encode_json("result", result, s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
-};
-
-// command: DELETE /topics/<topic-name>
-class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
-public:
- int get_params() override {
- topic_name = s->object->get_name();
- return 0;
- }
-};
-
-// ceph specifc topics handler factory
-class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
-protected:
- int init_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
-
- int read_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
-
- bool supports_quota() override {
- return false;
- }
-
- RGWOp *op_get() override {
- if (s->init_state.url_bucket.empty()) {
- return nullptr;
- }
- if (s->object == nullptr || s->object->empty()) {
- return new RGWPSListTopics_ObjStore();
- }
- return new RGWPSGetTopic_ObjStore();
- }
- RGWOp *op_put() override {
- if (!s->object->empty()) {
- return new RGWPSCreateTopic_ObjStore();
- }
- return nullptr;
- }
- RGWOp *op_delete() override {
- if (!s->object->empty()) {
- return new RGWPSDeleteTopic_ObjStore();
- }
- return nullptr;
- }
-public:
- explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSTopic() = default;
-};
-
-// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
-class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
-public:
- int get_params() override {
- sub_name = s->object->get_name();
-
- bool exists;
- topic_name = s->info.args.get("topic", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
- return -EINVAL;
- }
-
- const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
- const auto& conf = psmodule->get_effective_conf();
-
- dest.push_endpoint = s->info.args.get("push-endpoint");
- if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
- return -EINVAL;
- }
- dest.push_endpoint_args = s->info.args.get_str();
- dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
- dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
- dest.arn_topic = topic_name;
-
- return 0;
- }
-};
-
-// command: GET /subscriptions/<sub-name>
-class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
-public:
- int get_params() override {
- sub_name = s->object->get_name();
- return 0;
- }
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
-
- encode_json("result", result, s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
-};
-
-// command: DELETE /subscriptions/<sub-name>
-class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
-public:
- int get_params() override {
- sub_name = s->object->get_name();
- topic_name = s->info.args.get("topic");
- return 0;
- }
-};
-
-// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
-class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
-public:
- explicit RGWPSAckSubEvent_ObjStore() {}
-
- int get_params() override {
- sub_name = s->object->get_name();
-
- bool exists;
-
- event_id = s->info.args.get("event-id", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'event-id'" << dendl;
- return -EINVAL;
- }
- return 0;
- }
-};
-
-// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
-class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
-public:
- int get_params() override {
- sub_name = s->object->get_name();
- marker = s->info.args.get("marker");
- const int ret = s->info.args.get_int("max-entries", &max_entries,
- RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
- if (ret < 0) {
- ldpp_dout(this, 1) << "failed to parse 'max-entries' param" << dendl;
- return -EINVAL;
- }
- return 0;
- }
-
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
-
- encode_json("result", *sub, s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
-};
-
-// subscriptions handler factory
-class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
-protected:
- int init_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
-
- int read_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
- bool supports_quota() override {
- return false;
- }
- RGWOp *op_get() override {
- if (s->object->empty()) {
- return nullptr;
- }
- if (s->info.args.exists("events")) {
- return new RGWPSPullSubEvents_ObjStore();
- }
- return new RGWPSGetSub_ObjStore();
- }
- RGWOp *op_put() override {
- if (!s->object->empty()) {
- return new RGWPSCreateSub_ObjStore();
- }
- return nullptr;
- }
- RGWOp *op_delete() override {
- if (!s->object->empty()) {
- return new RGWPSDeleteSub_ObjStore();
- }
- return nullptr;
- }
- RGWOp *op_post() override {
- if (s->info.args.exists("ack")) {
- return new RGWPSAckSubEvent_ObjStore();
- }
- return nullptr;
- }
-public:
- explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSSub() = default;
-};
-
-namespace {
-// extract bucket name from ceph specific notification command, with the format:
-// /notifications/<bucket-name>
-int notif_bucket_path(const string& path, std::string& bucket_name) {
- if (path.empty()) {
- return -EINVAL;
- }
- size_t pos = path.find('/');
- if (pos == string::npos) {
- return -EINVAL;
- }
- if (pos >= path.size()) {
- return -EINVAL;
- }
-
- string type = path.substr(0, pos);
- if (type != "bucket") {
- return -EINVAL;
- }
-
- bucket_name = path.substr(pos + 1);
- return 0;
-}
-}
-
-// command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
-class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
-private:
- std::string topic_name;
- rgw::notify::EventTypeList events;
-
- int get_params() override {
- bool exists;
- topic_name = s->info.args.get("topic", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
- return -EINVAL;
- }
-
- std::string events_str = s->info.args.get("events", &exists);
- if (!exists) {
- // if no events are provided, we notify on all of them
- events_str =
- "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
- }
- rgw::notify::from_string_list(events_str, events);
- if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
- ldpp_dout(this, 1) << "invalid event type in list: " << events_str << dendl;
- return -EINVAL;
- }
- return notif_bucket_path(s->object->get_name(), bucket_name);
- }
-
-public:
- const char* name() const override { return "pubsub_notification_create"; }
- void execute(optional_yield y) override;
-};
-
-void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
-{
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-
- auto b = ps->get_bucket(bucket_info.bucket);
- op_ret = b->create_notification(this, topic_name, events, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
-}
-
-// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
-class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
-private:
- std::string topic_name;
-
- int get_params() override {
- bool exists;
- topic_name = s->info.args.get("topic", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
- return -EINVAL;
- }
- return notif_bucket_path(s->object->get_name(), bucket_name);
- }
-
-public:
- void execute(optional_yield y) override;
- const char* name() const override { return "pubsub_notification_delete"; }
-};
-
-void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- op_ret = b->remove_notification(this, topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(s, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
-}
-
-// command: GET /notifications/bucket/<bucket>
-class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
-private:
- rgw_pubsub_bucket_topics result;
-
- int get_params() override {
- return notif_bucket_path(s->object->get_name(), bucket_name);
- }
-
-public:
- void execute(optional_yield y) override;
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/json");
-
- if (op_ret < 0) {
- return;
- }
- encode_json("result", result, s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
- const char* name() const override { return "pubsub_notifications_list"; }
-};
-
-void RGWPSListNotifs_ObjStore::execute(optional_yield y)
-{
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- op_ret = b->get_topics(&result);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
- return;
- }
-}
-
-// ceph specific notification handler factory
-class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
-protected:
- int init_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
-
- int read_permissions(RGWOp* op, optional_yield) override {
- return 0;
- }
- bool supports_quota() override {
- return false;
- }
- RGWOp *op_get() override {
- if (s->object->empty()) {
- return nullptr;
- }
- return new RGWPSListNotifs_ObjStore();
- }
- RGWOp *op_put() override {
- if (!s->object->empty()) {
- return new RGWPSCreateNotif_ObjStore();
- }
- return nullptr;
- }
- RGWOp *op_delete() override {
- if (!s->object->empty()) {
- return new RGWPSDeleteNotif_ObjStore();
- }
- return nullptr;
- }
-public:
- explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSNotifs() = default;
-};
-
-// factory for ceph specific PubSub REST handlers
-RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(rgw::sal::Store* store,
- req_state* const s,
- const rgw::auth::StrategyRegistry& auth_registry,
- const std::string& frontend_prefix)
-{
- if (RGWHandler_REST_S3::init_from_header(store, s, RGWFormat::JSON, true) < 0) {
- return nullptr;
- }
-
- RGWHandler_REST* handler{nullptr};
-
- // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
- // this API is available only on RGW that belong to a pubsub zone
- if (s->init_state.url_bucket == "topics") {
- handler = new RGWHandler_REST_PSTopic(auth_registry);
- } else if (s->init_state.url_bucket == "subscriptions") {
- handler = new RGWHandler_REST_PSSub(auth_registry);
- } else if (s->init_state.url_bucket == "notifications") {
- handler = new RGWHandler_REST_PSNotifs(auth_registry);
- } else if (s->info.args.exists("notification")) {
- const int ret = RGWHandler_REST::allocate_formatter(s, RGWFormat::XML, true);
- if (ret == 0) {
- handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
- }
- }
-
- ldpp_dout(s, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
-
- return handler;
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab ft=cpp
-
-#include "services/svc_zone.h"
-#include "rgw_common.h"
-#include "rgw_coroutine.h"
-#include "rgw_sync_module.h"
-#include "rgw_data_sync.h"
-#include "rgw_sync_module_pubsub.h"
-#include "rgw_sync_module_pubsub_rest.h"
-#include "rgw_rest_conn.h"
-#include "rgw_cr_rados.h"
-#include "rgw_cr_rest.h"
-#include "rgw_cr_tools.h"
-#include "rgw_op.h"
-#include "rgw_pubsub.h"
-#include "rgw_pubsub_push.h"
-#include "rgw_notify_event_type.h"
-#include "rgw_perf_counters.h"
-
-#include <boost/algorithm/hex.hpp>
-#include <boost/asio/yield.hpp>
-
-#define dout_subsys ceph_subsys_rgw
-
-
-#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
-
-using namespace std;
-
-/*
-
-config:
-
-{
- "tenant": <tenant>, # default: <empty>
- "uid": <uid>, # default: "pubsub"
- "data_bucket_prefix": <prefix> # default: "pubsub-"
- "data_oid_prefix": <prefix> #
- "events_retention_days": <int> # default: 7
- "start_with_full_sync" <bool> # default: false
-}
-
-*/
-
-// utility function to convert the args list from string format
-// (ampresend separated with equal sign) to prased structure
-RGWHTTPArgs string_to_args(const std::string& str_args, const DoutPrefixProvider *dpp) {
- RGWHTTPArgs args;
- args.set(str_args);
- args.parse(dpp);
- return args;
-}
-
-struct PSSubConfig {
- std::string name;
- std::string topic;
- std::string push_endpoint_name;
- std::string push_endpoint_args;
- std::string data_bucket_name;
- std::string data_oid_prefix;
- std::string s3_id;
- std::string arn_topic;
- RGWPubSubEndpoint::Ptr push_endpoint;
-
- void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc, const DoutPrefixProvider *dpp) {
- name = uc.name;
- topic = uc.topic;
- push_endpoint_name = uc.dest.push_endpoint;
- data_bucket_name = uc.dest.bucket_name;
- data_oid_prefix = uc.dest.oid_prefix;
- s3_id = uc.s3_id;
- arn_topic = uc.dest.arn_topic;
- if (!push_endpoint_name.empty()) {
- push_endpoint_args = uc.dest.push_endpoint_args;
- try {
- push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args, dpp), cct);
- ldpp_dout(dpp, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
- } catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
- << push_endpoint_name << " due to: " << e.what() << dendl;
- }
- }
- }
-
- void dump(Formatter *f) const {
- encode_json("name", name, f);
- encode_json("topic", topic, f);
- encode_json("push_endpoint", push_endpoint_name, f);
- encode_json("push_endpoint_args", push_endpoint_args, f);
- encode_json("data_bucket_name", data_bucket_name, f);
- encode_json("data_oid_prefix", data_oid_prefix, f);
- encode_json("s3_id", s3_id, f);
- }
-
-};
-
-using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
-
-struct PSTopicConfig {
- std::string name;
- std::set<std::string> subs;
- std::string opaque_data;
-
- void dump(Formatter *f) const {
- encode_json("name", name, f);
- encode_json("subs", subs, f);
- encode_json("opaque", opaque_data, f);
- }
-};
-
-struct PSNotificationConfig {
- uint64_t id{0};
- string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
- string topic;
- bool is_prefix{false};
-
-
- void dump(Formatter *f) const {
- encode_json("id", id, f);
- encode_json("path", path, f);
- encode_json("topic", topic, f);
- encode_json("is_prefix", is_prefix, f);
- }
-
- void init(CephContext *cct, const JSONFormattable& config) {
- path = config["path"];
- if (!path.empty() && path[path.size() - 1] == '*') {
- path = path.substr(0, path.size() - 1);
- is_prefix = true;
- }
- topic = config["topic"];
- }
-};
-
-template<class T>
-static string json_str(const char *name, const T& obj, bool pretty = false)
-{
- stringstream ss;
- JSONFormatter f(pretty);
-
- encode_json(name, obj, &f);
- f.flush(ss);
-
- return ss.str();
-}
-
-using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
-using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
-
-// global pubsub configuration
-struct PSConfig {
- const std::string id{"pubsub"};
- rgw_user user;
- std::string data_bucket_prefix;
- std::string data_oid_prefix;
- int events_retention_days{0};
- uint64_t sync_instance{0};
- bool start_with_full_sync{false};
-
- void dump(Formatter *f) const {
- encode_json("id", id, f);
- encode_json("user", user, f);
- encode_json("data_bucket_prefix", data_bucket_prefix, f);
- encode_json("data_oid_prefix", data_oid_prefix, f);
- encode_json("events_retention_days", events_retention_days, f);
- encode_json("sync_instance", sync_instance, f);
- encode_json("start_with_full_sync", start_with_full_sync, f);
- }
-
- void init(CephContext *cct, const JSONFormattable& config) {
- string uid = config["uid"]("pubsub");
- user = rgw_user(config["tenant"], uid);
- data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
- data_oid_prefix = config["data_oid_prefix"];
- events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
- start_with_full_sync = config["start_with_full_sync"](false);
-
- ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
- }
-
- void init_instance(const RGWRealm& realm, uint64_t instance_id) {
- sync_instance = instance_id;
- }
-};
-
-using PSConfigRef = std::shared_ptr<PSConfig>;
-template<typename EventType>
-using EventRef = std::shared_ptr<EventType>;
-
-struct objstore_event {
- string id;
- const rgw_bucket& bucket;
- const rgw_obj_key& key;
- const ceph::real_time& mtime;
- const std::vector<std::pair<std::string, std::string> > *attrs;
-
- objstore_event(const rgw_bucket& _bucket,
- const rgw_obj_key& _key,
- const ceph::real_time& _mtime,
- const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
- key(_key),
- mtime(_mtime),
- attrs(_attrs) {}
-
- string get_hash() {
- string etag;
- RGWMD5Etag hash;
- hash.update(bucket.bucket_id);
- hash.update(key.name);
- hash.update(key.instance);
- hash.finish(&etag);
-
- assert(etag.size() > 8);
-
- return etag.substr(0, 8);
- }
-
- void dump(Formatter *f) const {
- {
- Formatter::ObjectSection s(*f, "bucket");
- encode_json("name", bucket.name, f);
- encode_json("tenant", bucket.tenant, f);
- encode_json("bucket_id", bucket.bucket_id, f);
- }
- {
- Formatter::ObjectSection s(*f, "key");
- encode_json("name", key.name, f);
- encode_json("instance", key.instance, f);
- }
- utime_t mt(mtime);
- encode_json("mtime", mt, f);
- Formatter::ObjectSection s(*f, "attrs");
- if (attrs) {
- for (auto& attr : *attrs) {
- encode_json(attr.first.c_str(), attr.second.c_str(), f);
- }
- }
- }
-};
-
-static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
- const rgw_obj_key& key,
- const ceph::real_time& mtime,
- const std::vector<std::pair<std::string, std::string> > *attrs,
- rgw::notify::EventType event_type,
- EventRef<rgw_pubsub_event> *event) {
- *event = std::make_shared<rgw_pubsub_event>();
-
- EventRef<rgw_pubsub_event>& e = *event;
- e->event_name = rgw::notify::to_ceph_string(event_type);
- e->source = bucket.name + "/" + key.name;
- e->timestamp = real_clock::now();
-
- objstore_event oevent(bucket, key, mtime, attrs);
-
- const utime_t ts(e->timestamp);
- set_event_id(e->id, oevent.get_hash(), ts);
-
- encode_json("info", oevent, &e->info);
-}
-
-static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
- const rgw_user& owner,
- const rgw_obj_key& key,
- const ceph::real_time& mtime,
- const std::vector<std::pair<std::string, std::string>>* attrs,
- rgw::notify::EventType event_type,
- EventRef<rgw_pubsub_s3_event>* event) {
- *event = std::make_shared<rgw_pubsub_s3_event>();
-
- EventRef<rgw_pubsub_s3_event>& e = *event;
- e->eventTime = mtime;
- e->eventName = rgw::notify::to_event_string(event_type);
- // userIdentity: not supported in sync module
- // x_amz_request_id: not supported in sync module
- // x_amz_id_2: not supported in sync module
- // configurationId is filled from subscription configuration
- e->bucket_name = bucket.name;
- e->bucket_ownerIdentity = owner.to_str();
- e->bucket_arn = to_string(rgw::ARN(bucket));
- e->bucket_id = bucket.bucket_id; // rgw extension
- e->object_key = key.name;
- // object_size not supported in sync module
- objstore_event oevent(bucket, key, mtime, attrs);
- e->object_etag = oevent.get_hash();
- e->object_versionId = key.instance;
-
- // use timestamp as per key sequence id (hex encoded)
- const utime_t ts(real_clock::now());
- boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
- std::back_inserter(e->object_sequencer));
-
- set_event_id(e->id, e->object_etag, ts);
-}
-
-class PSManager;
-using PSManagerRef = std::shared_ptr<PSManager>;
-
-struct PSEnv {
- PSConfigRef conf;
- shared_ptr<RGWUserInfo> data_user_info;
- PSManagerRef manager;
-
- PSEnv() : conf(make_shared<PSConfig>()),
- data_user_info(make_shared<RGWUserInfo>()) {}
-
- void init(CephContext *cct, const JSONFormattable& config) {
- conf->init(cct, config);
- }
-
- void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
-};
-
-using PSEnvRef = std::shared_ptr<PSEnv>;
-
-template<typename EventType>
-class PSEvent {
- const EventRef<EventType> event;
-
-public:
- PSEvent(const EventRef<EventType>& _event) : event(_event) {}
-
- void format(bufferlist *bl) const {
- bl->append(json_str("", *event));
- }
-
- void encode_event(bufferlist& bl) const {
- encode(*event, bl);
- }
-
- const string& id() const {
- return event->id;
- }
-};
-
-template <class T>
-class RGWSingletonCR : public RGWCoroutine {
- friend class WrapperCR;
-
- boost::asio::coroutine wrapper_state;
- bool started{false};
- int operate_ret{0};
-
- struct WaiterInfo {
- RGWCoroutine *cr{nullptr};
- T *result;
- };
- using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
-
- deque<WaiterInfoRef> waiters;
-
- void add_waiter(RGWCoroutine *cr, T *result) {
- auto waiter = std::make_shared<WaiterInfo>();
- waiter->cr = cr;
- waiter->result = result;
- waiters.push_back(waiter);
- };
-
- bool get_next_waiter(WaiterInfoRef *waiter) {
- if (waiters.empty()) {
- waiter->reset();
- return false;
- }
-
- *waiter = waiters.front();
- waiters.pop_front();
- return true;
- }
-
- int operate_wrapper(const DoutPrefixProvider *dpp) override {
- reenter(&wrapper_state) {
- while (!is_done()) {
- ldpp_dout(dpp, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
- operate_ret = operate(dpp);
- if (operate_ret < 0) {
- ldpp_dout(dpp, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
- }
- if (!is_done()) {
- yield;
- }
- }
-
- ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
- /* we're done, can't yield anymore */
-
- WaiterInfoRef waiter;
- while (get_next_waiter(&waiter)) {
- ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
- waiter->cr->set_retcode(retcode);
- waiter->cr->set_sleeping(false);
- return_result(dpp, waiter->result);
- put();
- }
-
- return retcode;
- }
- return 0;
- }
-
- virtual void return_result(const DoutPrefixProvider *dpp, T *result) {}
-
-public:
- RGWSingletonCR(CephContext *_cct)
- : RGWCoroutine(_cct) {}
-
- int execute(const DoutPrefixProvider *dpp, RGWCoroutine *caller, T *result = nullptr) {
- if (!started) {
- ldpp_dout(dpp, 20) << __func__ << "(): singleton not started, starting" << dendl;
- started = true;
- caller->call(this);
- return 0;
- } else if (!is_done()) {
- ldpp_dout(dpp, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
- get();
- add_waiter(caller, result);
- caller->set_sleeping(true);
- return 0;
- }
-
- ldpp_dout(dpp, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
- caller->set_retcode(retcode);
- return_result(dpp, result);
- return retcode;
- }
-};
-
-
-class PSSubscription;
-using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
-
-class PSSubscription {
- class InitCR;
- friend class InitCR;
- friend class RGWPSHandleObjEventCR;
-
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSEnvRef env;
- PSSubConfigRef sub_conf;
- std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
- RGWBucketInfo *bucket_info{nullptr};
- RGWDataAccessRef data_access;
- RGWDataAccess::BucketRef bucket;
-
- InitCR *init_cr{nullptr};
-
- class InitBucketLifecycleCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSConfigRef& conf;
- LCRule rule;
-
- int retention_days;
-
- rgw_bucket_lifecycle_config_params lc_config;
-
- public:
- InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
- PSConfigRef& _conf,
- rgw::sal::Bucket* _bucket) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- conf(_conf) {
- lc_config.bucket = _bucket;
- lc_config.bucket_attrs = _bucket->get_attrs();
- retention_days = conf->events_retention_days;
- }
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
-
- rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
-
- {
- /* maybe we already have it configured? */
- RGWLifecycleConfiguration old_config;
- auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
- if (aiter != lc_config.bucket_attrs.end()) {
- bufferlist::const_iterator iter{&aiter->second};
- try {
- old_config.decode(iter);
- } catch (const buffer::error& e) {
- ldpp_dout(dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
- }
- }
-
- auto old_rules = old_config.get_rule_map();
- for (auto ori : old_rules) {
- auto& old_rule = ori.second;
-
- if (old_rule.get_prefix().empty() &&
- old_rule.get_expiration().get_days() == retention_days &&
- old_rule.is_enabled()) {
- ldpp_dout(dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
- return set_cr_done();
- }
- }
- }
-
- lc_config.config.add_rule(rule);
- yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
- sync_env->store,
- lc_config,
- dpp));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- return set_cr_done();
- }
- return 0;
- }
- };
-
- class InitCR : public RGWSingletonCR<bool> {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSSubscriptionRef sub;
- rgw_get_bucket_info_params get_bucket_info;
- rgw_bucket_create_local_params create_bucket;
- PSConfigRef& conf;
- PSSubConfigRef& sub_conf;
- int i;
-
- public:
- InitCR(RGWDataSyncCtx *_sc,
- PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- sub(_sub), conf(sub->env->conf),
- sub_conf(sub->sub_conf) {
- }
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- get_bucket_info.tenant = conf->user.tenant;
- get_bucket_info.bucket_name = sub_conf->data_bucket_name;
- sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
-
- for (i = 0; i < 2; ++i) {
- yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
- sync_env->store,
- get_bucket_info,
- sub->get_bucket_info_result,
- dpp));
- if (retcode < 0 && retcode != -ENOENT) {
- ldpp_dout(dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
- << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
- }
- if (retcode == 0) {
- {
- auto& result = sub->get_bucket_info_result;
- sub->bucket_info = &result->bucket->get_info();
-
- int ret = sub->data_access->get_bucket(result->bucket->get_info(), result->bucket->get_attrs(), &sub->bucket);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket << " failed, ret=" << ret << dendl;
- return set_cr_error(ret);
- }
- }
-
- yield call(new InitBucketLifecycleCR(sc, conf,
- sub->get_bucket_info_result->bucket.get()));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- return set_cr_done();
- }
-
- create_bucket.user_info = sub->env->data_user_info;
- create_bucket.bucket_name = sub_conf->data_bucket_name;
- ldpp_dout(dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
- yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
- sync_env->store,
- create_bucket,
- dpp));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
- << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- /* second iteration: we got -ENOENT and created a bucket */
- }
-
- /* failed twice on -ENOENT, unexpected */
- ldpp_dout(dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
- << " name=" << get_bucket_info.bucket_name << dendl;
- return set_cr_error(-EIO);
- }
- return 0;
- }
- };
-
- template<typename EventType>
- class StoreEventCR : public RGWCoroutine {
- RGWDataSyncCtx* const sc;
- RGWDataSyncEnv* const sync_env;
- const PSSubscriptionRef sub;
- const PSEvent<EventType> pse;
- const string oid_prefix;
-
- public:
- StoreEventCR(RGWDataSyncCtx* const _sc,
- const PSSubscriptionRef& _sub,
- const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- sub(_sub),
- pse(_event),
- oid_prefix(sub->sub_conf->data_oid_prefix) {
- }
-
- int operate(const DoutPrefixProvider *dpp) override {
- rgw_object_simple_put_params put_obj;
- reenter(this) {
-
- put_obj.bucket = sub->bucket;
- put_obj.key = rgw_obj_key(oid_prefix + pse.id());
-
- pse.format(&put_obj.data);
-
- {
- bufferlist bl;
- pse.encode_event(bl);
- bufferlist bl64;
- bl.encode_base64(bl64);
- put_obj.user_data = bl64.to_str();
- }
-
- yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
- sync_env->store,
- put_obj,
- dpp));
- if (retcode < 0) {
- ldpp_dout(dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
- return set_cr_error(retcode);
- } else {
- ldpp_dout(dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
- }
-
- return set_cr_done();
- }
- return 0;
- }
- };
-
- template<typename EventType>
- class PushEventCR : public RGWCoroutine {
- RGWDataSyncCtx* const sc;
- RGWDataSyncEnv* const sync_env;
- const EventRef<EventType> event;
- const PSSubConfigRef& sub_conf;
-
- public:
- PushEventCR(RGWDataSyncCtx* const _sc,
- const PSSubscriptionRef& _sub,
- const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- event(_event),
- sub_conf(_sub->sub_conf) {
- }
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ceph_assert(sub_conf->push_endpoint);
- yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
-
- if (retcode < 0) {
- ldpp_dout(dpp, 10) << "failed to push event: " << event->id <<
- " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- ldpp_dout(dpp, 20) << "event: " << event->id <<
- " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
- return set_cr_done();
- }
- return 0;
- }
- };
-
-public:
- PSSubscription(RGWDataSyncCtx *_sc,
- PSEnvRef _env,
- PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
- env(_env),
- sub_conf(_sub_conf),
- data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
-
- PSSubscription(RGWDataSyncCtx *_sc,
- PSEnvRef _env,
- rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
- env(_env),
- sub_conf(std::make_shared<PSSubConfig>()),
- data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
- sub_conf->from_user_conf(sync_env->cct, user_sub_conf, sync_env->dpp);
- }
- virtual ~PSSubscription() {
- if (init_cr) {
- init_cr->put();
- }
- }
-
- template <class C>
- static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
- PSEnvRef _env,
- C& _sub_conf) {
- auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
- sub->init_cr = new InitCR(_sc, sub);
- sub->init_cr->get();
- return sub;
- }
-
- int call_init_cr(const DoutPrefixProvider *dpp, RGWCoroutine *caller) {
- return init_cr->execute(dpp, caller);
- }
-
- template<typename EventType>
- static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
- return new StoreEventCR<EventType>(sc, sub, event);
- }
-
- template<typename EventType>
- static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
- return new PushEventCR<EventType>(sc, sub, event);
- }
- friend class InitCR;
-};
-
-class PSManager
-{
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSEnvRef env;
-
- std::map<string, PSSubscriptionRef> subs;
-
- class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSManagerRef mgr;
- rgw_user owner;
- string sub_name;
- string sub_id;
- PSSubscriptionRef *ref;
-
- PSConfigRef conf;
-
- PSSubConfigRef sub_conf;
- rgw_pubsub_sub_config user_sub_conf;
-
- public:
- GetSubCR(RGWDataSyncCtx *_sc,
- PSManagerRef& _mgr,
- const rgw_user& _owner,
- const string& _sub_name,
- PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- mgr(_mgr),
- owner(_owner),
- sub_name(_sub_name),
- ref(_ref),
- conf(mgr->env->conf) {
- }
- ~GetSubCR() { }
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- if (owner.empty()) {
- ldpp_dout(dpp, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
- mgr->remove_get_sub(owner, sub_name);
- return set_cr_error(-EINVAL);
- } else {
- using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
- yield {
- RGWPubSub ps(sync_env->store, owner.tenant);
- rgw_raw_obj obj;
- ps.get_sub_meta_obj(sub_name, &obj);
- bool empty_on_enoent = false;
- call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
- obj,
- &user_sub_conf, empty_on_enoent));
- }
- if (retcode < 0) {
- mgr->remove_get_sub(owner, sub_name);
- return set_cr_error(retcode);
- }
-
- *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
- }
-
- yield (*ref)->call_init_cr(dpp, this);
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
- mgr->remove_get_sub(owner, sub_name);
- return set_cr_error(retcode);
- }
-
- mgr->remove_get_sub(owner, sub_name);
-
- return set_cr_done();
- }
- return 0;
- }
-
- void return_result(const DoutPrefixProvider *dpp, PSSubscriptionRef *result) override {
- ldpp_dout(dpp, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
- if (retcode >= 0) {
- *result = *ref;
- }
- }
- };
-
- string sub_id(const rgw_user& owner, const string& sub_name) {
- string owner_prefix;
- if (!owner.empty()) {
- owner_prefix = owner.to_str() + "/";
- }
-
- return owner_prefix + sub_name;
- }
-
- std::map<std::string, GetSubCR *> get_subs;
-
- GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
- return get_subs[sub_id(owner, name)];
- }
-
- void remove_get_sub(const rgw_user& owner, const string& name) {
- get_subs.erase(sub_id(owner, name));
- }
-
- bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
- auto iter = subs.find(sub_id(owner, sub_name));
- if (iter != subs.end()) {
- *sub = iter->second;
- return true;
- }
- return false;
- }
-
- PSManager(RGWDataSyncCtx *_sc,
- PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
- env(_env) {}
-
-public:
- static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
- PSEnvRef _env) {
- return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
- }
-
- static int call_get_subscription_cr(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, PSManagerRef& mgr,
- RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
- if (mgr->find_sub_instance(owner, sub_name, ref)) {
- /* found it! nothing to execute */
- ldpp_dout(dpp, 20) << __func__ << "(): found sub instance" << dendl;
- }
- auto& gs = mgr->get_get_subs(owner, sub_name);
- if (!gs) {
- ldpp_dout(dpp, 20) << __func__ << "(): first get subs" << dendl;
- gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
- }
- ldpp_dout(dpp, 20) << __func__ << "(): executing get subs" << dendl;
- return gs->execute(dpp, caller, ref);
- }
-
- friend class GetSubCR;
-};
-
-void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
- manager = mgr;
- conf->init_instance(realm, instance_id);
-}
-
-class RGWPSInitEnvCBCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSEnvRef env;
- PSConfigRef& conf;
-
- rgw_user_create_params create_user;
- rgw_get_user_info_params get_user_info;
-public:
- RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
- PSEnvRef& _env) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- env(_env), conf(env->conf) {}
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ldpp_dout(dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
-
- /* nothing to do here right now */
- create_user.user = conf->user;
- create_user.max_buckets = 0; /* unlimited */
- create_user.display_name = "pubsub";
- create_user.generate_key = false;
- yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, dpp));
- if (retcode < 0 && retcode != -ERR_USER_EXIST) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- get_user_info.user = conf->user;
- yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info, dpp));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
-
- ldpp_dout(dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
-
-
- return set_cr_done();
- }
- return 0;
- }
-};
-
-bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
- if (!match(filter.events, event_type)) {
- return false;
- }
- if (!match(filter.s3_filter.key_filter, key_name)) {
- return false;
- }
- return true;
-}
-
-class RGWPSFindBucketTopicsCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- PSEnvRef env;
- rgw_user owner;
- rgw_bucket bucket;
- rgw_obj_key key;
- rgw::notify::EventType event_type;
-
- RGWPubSub ps;
-
- rgw_raw_obj bucket_obj;
- rgw_raw_obj user_obj;
- rgw_pubsub_bucket_topics bucket_topics;
- rgw_pubsub_topics user_topics;
- TopicsRef *topics;
-public:
- RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
- PSEnvRef& _env,
- const rgw_user& _owner,
- const rgw_bucket& _bucket,
- const rgw_obj_key& _key,
- rgw::notify::EventType _event_type,
- TopicsRef *_topics) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- env(_env),
- owner(_owner),
- bucket(_bucket),
- key(_key),
- event_type(_event_type),
- ps(sync_env->store, owner.tenant),
- topics(_topics) {
- *topics = std::make_shared<vector<PSTopicConfigRef> >();
- }
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ps.get_bucket_meta_obj(bucket, &bucket_obj);
- ps.get_meta_obj(&user_obj);
-
- using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
- yield {
- bool empty_on_enoent = true;
- call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
- bucket_obj,
- &bucket_topics, empty_on_enoent));
- }
- if (retcode < 0 && retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
-
- ldpp_dout(dpp, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
-
- if (!bucket_topics.topics.empty()) {
- using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
- yield {
- bool empty_on_enoent = true;
- call(new ReadUserTopicsInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
- user_obj,
- &user_topics, empty_on_enoent));
- }
- if (retcode < 0 && retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
- }
-
- for (auto& titer : bucket_topics.topics) {
- auto& topic_filter = titer.second;
- auto& info = topic_filter.topic;
- if (!match(topic_filter, key.name, event_type)) {
- continue;
- }
- std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
- tc->name = info.name;
- tc->subs = user_topics.topics[info.name].subs;
- tc->opaque_data = info.opaque_data;
- (*topics)->push_back(tc);
- }
-
- return set_cr_done();
- }
- return 0;
- }
-};
-
-class RGWPSHandleObjEventCR : public RGWCoroutine {
- RGWDataSyncCtx* const sc;
- const PSEnvRef env;
- const rgw_user owner;
- const EventRef<rgw_pubsub_event> event;
- const EventRef<rgw_pubsub_s3_event> s3_event;
- const TopicsRef topics;
- bool has_subscriptions;
- bool event_handled;
- PSSubscriptionRef sub;
- std::vector<PSTopicConfigRef>::const_iterator titer;
- std::set<std::string>::const_iterator siter;
-
-public:
- RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
- const PSEnvRef _env,
- const rgw_user& _owner,
- const EventRef<rgw_pubsub_event>& _event,
- const EventRef<rgw_pubsub_s3_event>& _s3_event,
- const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
- sc(_sc),
- env(_env),
- owner(_owner),
- event(_event),
- s3_event(_s3_event),
- topics(_topics),
- has_subscriptions(false),
- event_handled(false) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ldpp_dout(dpp, 20) << ": handle event: obj: z=" << sc->source_zone
- << " event=" << json_str("event", *event, false)
- << " owner=" << owner << dendl;
-
- ldpp_dout(dpp, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
-
- // outside caller should check that
- ceph_assert(!topics->empty());
-
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
-
- // loop over all topics related to the bucket/object
- for (titer = topics->begin(); titer != topics->end(); ++titer) {
- ldpp_dout(dpp, 20) << ": notification for " << event->source << ": topic=" <<
- (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
- // loop over all subscriptions of the topic
- for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
- ldpp_dout(dpp, 20) << ": subscription: " << *siter << dendl;
- has_subscriptions = true;
- // try to read subscription configuration
- yield PSManager::call_get_subscription_cr(dpp, sc, env->manager, this, owner, *siter, &sub);
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
- ldpp_dout(dpp, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
- << " ret=" << retcode << dendl;
- if (retcode == -ENOENT) {
- // missing subscription info should be reflected back as invalid argument
- // and not as missing object
- retcode = -EINVAL;
- }
- // try the next subscription
- continue;
- }
- if (sub->sub_conf->s3_id.empty()) {
- // subscription was not made by S3 compatible API
- ldpp_dout(dpp, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- yield call(PSSubscription::store_event_cr(sc, sub, event));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
- ldpp_dout(dpp, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
- event_handled = true;
- }
- if (sub->sub_conf->push_endpoint) {
- ldpp_dout(dpp, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sc, sub, event));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
- ldpp_dout(dpp, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- event_handled = true;
- }
- }
- } else {
- // subscription was made by S3 compatible API
- ldpp_dout(dpp, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- s3_event->configurationId = sub->sub_conf->s3_id;
- s3_event->opaque_data = (*titer)->opaque_data;
- yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
- ldpp_dout(dpp, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
- event_handled = true;
- }
- if (sub->sub_conf->push_endpoint) {
- ldpp_dout(dpp, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
- if (retcode < 0) {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
- ldpp_dout(dpp, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
- } else {
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- event_handled = true;
- }
- }
- }
- }
- }
- if (has_subscriptions && !event_handled) {
- // event is considered "lost" of it has subscriptions on any of its topics
- // but it was not stored in, or pushed to, any of them
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
- return 0;
- }
-};
-
-// coroutine invoked on remote object creation
-class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
- RGWDataSyncCtx *sc;
- rgw_bucket_sync_pipe sync_pipe;
- PSEnvRef env;
- std::optional<uint64_t> versioned_epoch;
- EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_event> s3_event;
- TopicsRef topics;
-public:
- RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
- rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
- sc(_sc),
- sync_pipe(_sync_pipe),
- env(_env),
- versioned_epoch(_versioned_epoch),
- topics(_topics) {
- }
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ldpp_dout(dpp, 20) << ": stat of remote obj: z=" << sc->source_zone
- << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
- {
- std::vector<std::pair<std::string, std::string> > attrs;
- for (auto& attr : attrs) {
- std::string k = attr.first;
- if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
- k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
- }
- attrs.push_back(std::make_pair(k, attr.second));
- }
- // at this point we don't know whether we need the ceph event or S3 event
- // this is why both are created here, once we have information about the
- // subscription, we will store/push only the relevant ones
- make_event_ref(sc->cct,
- sync_pipe.info.source_bs.bucket, key,
- mtime, &attrs,
- rgw::notify::ObjectCreated, &event);
- make_s3_event_ref(sc->cct,
- sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
- mtime, &attrs,
- rgw::notify::ObjectCreated, &s3_event);
- }
-
- yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
- return 0;
- }
-};
-
-class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
- rgw_bucket_sync_pipe sync_pipe;
- PSEnvRef env;
- std::optional<uint64_t> versioned_epoch;
- TopicsRef topics;
-public:
- RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
- rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
- sync_pipe(_sync_pipe),
- env(_env), versioned_epoch(_versioned_epoch),
- topics(_topics) {
- }
-
- ~RGWPSHandleRemoteObjCR() override {}
-
- RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
- }
-};
-
-class RGWPSHandleObjCreateCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- rgw_bucket_sync_pipe sync_pipe;
- rgw_obj_key key;
- PSEnvRef env;
- std::optional<uint64_t> versioned_epoch;
- TopicsRef topics;
-public:
- RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
- rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
- sc(_sc),
- sync_pipe(_sync_pipe),
- key(_key),
- env(_env),
- versioned_epoch(_versioned_epoch) {
- }
-
- ~RGWPSHandleObjCreateCR() override {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
- sync_pipe.info.source_bs.bucket, key,
- rgw::notify::ObjectCreated,
- &topics));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
- if (topics->empty()) {
- ldpp_dout(dpp, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
- return set_cr_done();
- }
- yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
- return 0;
- }
-};
-
-// coroutine invoked on remote object deletion
-class RGWPSGenericObjEventCBCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- PSEnvRef env;
- rgw_user owner;
- rgw_bucket bucket;
- rgw_obj_key key;
- ceph::real_time mtime;
- rgw::notify::EventType event_type;
- EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_event> s3_event;
- TopicsRef topics;
-public:
- RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
- PSEnvRef _env,
- rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
- rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
- sc(_sc),
- env(_env),
- owner(_sync_pipe.dest_bucket_info.owner),
- bucket(_sync_pipe.dest_bucket_info.bucket),
- key(_key),
- mtime(_mtime), event_type(_event_type) {}
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- ldpp_dout(dpp, 20) << ": remove remote obj: z=" << sc->source_zone
- << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
- if (retcode < 0) {
- ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
- return set_cr_error(retcode);
- }
- if (topics->empty()) {
- ldpp_dout(dpp, 20) << "no topics found for " << bucket << "/" << key << dendl;
- return set_cr_done();
- }
- // at this point we don't know whether we need the ceph event or S3 event
- // this is why both are created here, once we have information about the
- // subscription, we will store/push only the relevant ones
- make_event_ref(sc->cct,
- bucket, key,
- mtime, nullptr,
- event_type, &event);
- make_s3_event_ref(sc->cct,
- bucket, owner, key,
- mtime, nullptr,
- event_type, &s3_event);
- yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
- return 0;
- }
-
-};
-
-class RGWPSDataSyncModule : public RGWDataSyncModule {
- PSEnvRef env;
- PSConfigRef& conf;
-
-public:
- RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
- env->init(cct, config);
- }
-
- ~RGWPSDataSyncModule() override {}
-
- void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
- auto sync_env = sc->env;
- PSManagerRef mgr = PSManager::get_shared(sc, env);
- env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
- }
-
- RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
- ldpp_dout(dpp, 5) << conf->id << ": start" << dendl;
- return new RGWPSInitEnvCBCR(sc, env);
- }
-
- RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
- rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
- " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
- }
-
- RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
- rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
- " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
- }
-
- RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
- rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
- " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
- }
-
- PSConfigRef& get_conf() { return conf; }
-};
-
-RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config)
-{
- data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
- const std::string jconf = json_str("conf", *data_handler->get_conf());
- JSONParser p;
- if (!p.parse(jconf.c_str(), jconf.size())) {
- ldpp_dout(dpp, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
- effective_conf = config;
- } else {
- effective_conf.decode_json(&p);
- }
-}
-
-RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
-{
- return data_handler.get();
-}
-
-RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
- if (dialect != RGW_REST_S3) {
- return orig;
- }
- return new RGWRESTMgr_PubSub();
-}
-
-bool RGWPSSyncModuleInstance::should_full_sync() const {
- return data_handler->get_conf()->start_with_full_sync;
-}
-
-int RGWPSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
- instance->reset(new RGWPSSyncModuleInstance(dpp, cct, config));
- return 0;
-}
-
-