ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->create_topic(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl;
return;
}
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->get_user_topics(&result);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->get_topic(topic_name, &result);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topic, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to get topic, ret=" << op_ret << dendl;
return;
}
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->remove_topic(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove topic, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl;
return;
}
}
auto sub = ups->get_sub(sub_name);
op_ret = sub->subscribe(topic_name, dest);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl;
return;
}
}
topic_name = s->info.args.get("topic", &exists);
if (!exists) {
- ldout(s->cct, 20) << "ERROR: missing required param 'topic' for request" << dendl;
+ ldout(s->cct, 1) << "missing required param 'topic' for request" << dendl;
return -EINVAL;
}
auto sub = ups->get_sub(sub_name);
op_ret = sub->get_conf(&result);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
return;
}
}
auto sub = ups->get_sub(sub_name);
op_ret = sub->unsubscribe(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl;
return;
}
}
auto sub = ups->get_sub(sub_name);
op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to ack event, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl;
return;
}
}
event_id = s->info.args.get("event-id", &exists);
if (!exists) {
- ldout(s->cct, 20) << "ERROR: missing required param 'event-id' for request" << dendl;
+ ldout(s->cct, 1) << "missing required param 'event-id' for request" << dendl;
return -EINVAL;
}
return 0;
auto sub = ups->get_sub(sub_name);
op_ret = sub->list_events(marker, max_entries, &result);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
return;
}
}
#define DEFAULT_MAX_ENTRIES 100
int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
if (ret < 0) {
- ldout(s->cct, 20) << "failed to parse 'max-entries' param" << dendl;
+ ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
}
return 0;
return ret;
}
- ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+ const auto& id = s->owner.get_id();
+
+ ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
bucket_info, nullptr, nullptr);
if (ret < 0) {
return ret;
}
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ if (bucket_info.owner != id) {
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl;
return -EPERM;
}
return 0;
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
}
- void execute() override;
- const char* name() const override { return "pubsub_notification_create"; }
- virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
- virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
virtual int get_params() = 0;
};
-void RGWPSCreateNotifOp::execute()
+
+// GCP style notification creation
+// Command: PUT /notification/bucket/<bucket name>?topic=<topic name>
+// "topic" has to be created beforehand
+class RGWPSCreateNotif_ObjStore_GCP : public RGWPSCreateNotifOp {
+public:
+ explicit RGWPSCreateNotif_ObjStore_GCP() {}
+
+ const char* name() const override { return "pubsub_notification_create_gcp"; }
+
+ void execute() override;
+
+ int get_params() override {
+ bool exists;
+ topic_name = s->info.args.get("topic", &exists);
+ if (!exists) {
+ ldout(s->cct, 1) << "param 'topic' not provided" << dendl;
+ return -EINVAL;
+ }
+
+ string events_str = s->info.args.get("events", &exists);
+ if (exists) {
+ get_str_set(events_str, ",", events);
+ }
+ return notif_bucket_path(s->object.name, &bucket_name);
+ }
+};
+
+void RGWPSCreateNotif_ObjStore_GCP::execute()
{
op_ret = get_params();
if (op_ret < 0) {
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
return;
}
}
+// S3 compliant notification creation
+// Command: PUT /<bucket name>?notification
+// a "topic" will be auto-generated
class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+protected:
+ std::string topic_name;
+
public:
explicit RGWPSCreateNotif_ObjStore_S3() {}
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+
+ void execute() override;
+
int get_params() override {
bool exists;
- topic_name = s->info.args.get("topic", &exists);
+ auto no_value = s->info.args.get("notification", &exists);
if (!exists) {
- ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+ ldout(s->cct, 20) << "param 'notification' not provided" << dendl;
+ return -EINVAL;
+ } else if (no_value.length() > 0) {
+ ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl;
return -EINVAL;
}
- string events_str = s->info.args.get("events", &exists);
- if (exists) {
- get_str_set(events_str, ",", events);
+ if (s->bucket_name.empty()) {
+ ldout(s->cct, 1) << "notification must be set on a bucket" << dendl;
+ return -EINVAL;
}
- return notif_bucket_path(s->object.name, &bucket_name);
+
+ bucket_name = s->bucket_name;
+
+ const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+
+ int r = 0;
+ bufferlist data;
+ std::tie(r, data) = rgw_rest_read_all_input(s, max_size);
+
+ if (r < 0) {
+ ldout(s->cct, 1) << "failed to read notification parameters from payload" << dendl;
+ return r;
+ }
+ if (data.length() == 0) {
+ ldout(s->cct, 1) << "payload missing for notification" << dendl;
+ return -EINVAL;
+ }
+
+ RGWXMLParser parser;
+
+ if (!parser.init()){
+ ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
+ return -EINVAL;
+ }
+
+ if (!parser.parse(data.c_str(), data.length(), 1)) {
+ ldout(s->cct, 1) << "failed to parse XML payload or notification" << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+
+ try {
+ } catch (RGWXMLDecoder::err& err) {
+ ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ return 0;
}
};
+void RGWPSCreateNotif_ObjStore_S3::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+
+ op_ret = ups->create_topic(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate topic for notification, ret=" << op_ret << dendl;
+ return;
+ }
+ auto b = ups->get_bucket(bucket_info.bucket);
+ op_ret = b->create_notification(topic_name, events);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
+ return;
+ }
+}
+
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
protected:
std::unique_ptr<RGWUserPubSub> ups;
}
if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl;
return -EPERM;
}
return 0;
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl;
return;
}
}
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+class RGWPSDeleteNotif_ObjStore_GCP : public RGWPSDeleteNotifOp {
public:
- explicit RGWPSDeleteNotif_ObjStore_S3() {}
+ explicit RGWPSDeleteNotif_ObjStore_GCP() {}
int get_params() override {
bool exists;
topic_name = s->info.args.get("topic", &exists);
if (!exists) {
- ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+ ldout(s->cct, 1) << "param 'topic' not provided" << dendl;
return -EINVAL;
}
return notif_bucket_path(s->object.name, &bucket_name);
}
if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot create topic" << dendl;
return -EPERM;
}
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->get_topics(&result);
if (op_ret < 0) {
- ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
return;
}
}
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+class RGWPSListNotifs_ObjStore_GCP : public RGWPSListNotifsOp {
public:
- explicit RGWPSListNotifs_ObjStore_S3() {}
+ explicit RGWPSListNotifs_ObjStore_GCP() {}
int get_params() override {
return notif_bucket_path(s->object.name, &bucket_name);
};
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+// GCP-style notification handler factory
+class RGWHandler_REST_PSNotifs_GCP : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
if (s->object.empty()) {
return nullptr;
}
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifs_ObjStore_GCP();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotif_ObjStore_GCP();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotif_ObjStore_GCP();
}
return nullptr;
}
+public:
+ explicit RGWHandler_REST_PSNotifs_GCP(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSNotifs_GCP() {}
+};
+
+// S3-compliant notification handler factory
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+ int init_permissions(RGWOp* op) override {
+ return 0;
+ }
+
+ int read_permissions(RGWOp* op) override {
+ return 0;
+ }
+ bool supports_quota() override {
+ return false;
+ }
+ RGWOp *op_get() override {
+ return nullptr;
+ }
+ RGWOp *op_put() override {
+ return new RGWPSCreateNotif_ObjStore_S3();
+ }
+ RGWOp *op_delete() override {
+ return nullptr;
+ }
public:
explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
virtual ~RGWHandler_REST_PSNotifs_S3() {}
};
-
+// factory for PubSub REST handlers
RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix)
return nullptr;
}
- RGWHandler_REST *handler = nullptr;;
+ RGWHandler_REST *handler = nullptr;
+ // GCP-style PubSub API: topics/subscriptions/notification are reserved bucket names
if (s->init_state.url_bucket == "topics") {
handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
- }
-
- if (s->init_state.url_bucket == "subscriptions") {
+ } else if (s->init_state.url_bucket == "subscriptions") {
handler = new RGWHandler_REST_PSSub_S3(auth_registry);
- }
-
- if (s->init_state.url_bucket == "notifications") {
+ } else if (s->init_state.url_bucket == "notifications") {
+ handler = new RGWHandler_REST_PSNotifs_GCP(auth_registry);
+ } else {
+ // S3-compliant PubSub API: uses: <bucket name>?notification
handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
}