From eb213bb302495098091061822c0c0bcbb958dcb9 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 31 Jul 2018 17:48:34 -0700 Subject: [PATCH] rgw: pubsub: filter events Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 5 ++- src/rgw/rgw_pubsub.cc | 12 +++++-- src/rgw/rgw_pubsub.h | 26 +++++++++++++-- src/rgw/rgw_sync_module_pubsub.cc | 46 +++++++++++++++++--------- src/rgw/rgw_sync_module_pubsub_rest.cc | 9 ++++- 5 files changed, 77 insertions(+), 21 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index ff78b96838768..b028acffab545 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2871,6 +2871,7 @@ int main(int argc, const char **argv) string sub_dest_bucket; string sub_push_endpoint; string event_id; + set event_types; for (std::vector::iterator i = args.begin(); i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) { @@ -3204,6 +3205,8 @@ int main(int argc, const char **argv) sub_push_endpoint = val; } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) { event_id = val; + } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) { + get_str_set(val, ",", event_types); } else if (strncmp(*i, "-", 1) == 0) { cerr << "ERROR: invalid flag " << *i << std::endl; return EINVAL; @@ -7935,7 +7938,7 @@ next: } auto b = ups.get_bucket(bucket_info.bucket); - ret = b->create_notification(topic_name); + ret = b->create_notification(topic_name, event_types); if (ret < 0) { cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl; return -ret; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index f8e968d4953ab..412aea2cf540f 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -21,6 +21,12 @@ void rgw_pubsub_topic::dump(Formatter *f) const encode_json("name", name, f); } +void rgw_pubsub_topic_filter::dump(Formatter *f) const +{ + encode_json("topic", topic, f); + encode_json("events", events, f); +} + void rgw_pubsub_topic_subs::dump(Formatter *f) const { encode_json("topic", topic, f); @@ -140,7 +146,7 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) } -int RGWUserPubSub::Bucket::create_notification(const string& topic_name) +int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const set& events) { rgw_pubsub_topic_subs user_topic_info; RGWRados *store = ps->store; @@ -160,7 +166,9 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name) return ret; } - bucket_topics.topics[topic_name] = user_topic_info.topic; + auto& topic_filter = bucket_topics.topics[topic_name]; + topic_filter.topic = user_topic_info.topic; + topic_filter.events = events; ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 2871f99903eb2..9f12cf1f7e06c 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -141,8 +141,30 @@ struct rgw_pubsub_topic_subs { }; WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) +struct rgw_pubsub_topic_filter { + rgw_pubsub_topic topic; + set events; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(topic, bl); + encode(events, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(topic, bl); + decode(events, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) + struct rgw_pubsub_bucket_topics { - map topics; + map topics; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -234,7 +256,7 @@ public: } int get_topics(rgw_pubsub_bucket_topics *result); - int create_notification(const string& topic_name); + int create_notification(const string& topic_name, const set& events); int remove_notification(const string& topic_name); }; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index cf49792168e8f..0e21b60f3341e 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -307,23 +307,28 @@ struct PSConfig { }; enum RGWPubSubEventType { - EVENT_UNKNOWN = 0, + UNKNOWN_EVENT = 0, OBJECT_CREATE = 1, OBJECT_DELETE = 2, DELETE_MARKER_CREATE = 3, }; -static const char *event_name(const RGWPubSubEventType& val) +#define EVENT_NAME_OBJECT_CREATE "OBJECT_CREATE" +#define EVENT_NAME_OBJECT_DELETE "OBJECT_DELETE" +#define EVENT_NAME_OBJECT_DELETE_MARKER_CREATE "DELETE_MARKER_CREATE" +#define EVENT_NAME_UNKNOWN "UNKNOWN_EVENT" + +static const char *get_event_name(const RGWPubSubEventType& val) { switch (val) { case OBJECT_CREATE: - return "OBJECT_CREATE"; + return EVENT_NAME_OBJECT_CREATE; case OBJECT_DELETE: - return "OBJECT_DELETE"; + return EVENT_NAME_OBJECT_DELETE; case DELETE_MARKER_CREATE: - return "DELETE_MARKER_CREATE"; + return EVENT_NAME_OBJECT_DELETE_MARKER_CREATE; default: - return "EVENT_UNKNOWN"; + return "EVENT_NAME_UNKNOWN"; }; } @@ -385,12 +390,12 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, const ceph::real_time& mtime, const std::vector > *attrs, - RGWPubSubEventType event_type, + const string& event_name, EventRef *event) { *event = std::make_shared(); EventRef& e = *event; - e->event = event_name(event_type); + e->event = event_name; e->source = bucket.name + "/" + key.name; e->timestamp = real_clock::now(); @@ -986,6 +991,7 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine { rgw_user owner; rgw_bucket bucket; rgw_obj_key key; + string event_name; RGWUserPubSub ups; @@ -1000,12 +1006,14 @@ public: const rgw_user& _owner, const rgw_bucket& _bucket, const rgw_obj_key& _key, + const string& _event_name, TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), owner(_owner), bucket(_bucket), key(_key), + event_name(_event_name), ups(_sync_env->store, owner), topics(_topics) { *topics = std::make_shared >(); @@ -1042,7 +1050,12 @@ public: } for (auto& titer : bucket_topics.topics) { - auto& info = titer.second; + auto& topic_filter = titer.second; + auto& info = topic_filter.topic; + if (!topic_filter.events.empty() && + topic_filter.events.find(event_name) == topic_filter.events.end()) { + continue; + } shared_ptr tc = std::make_shared(); tc->name = info.name; tc->subs = user_topics.topics[info.name].subs; @@ -1160,7 +1173,7 @@ public: make_event_ref(sync_env->cct, bucket_info.bucket, key, mtime, &attrs, - OBJECT_CREATE, &event); + EVENT_NAME_OBJECT_CREATE, &event); } yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics)); @@ -1216,7 +1229,10 @@ public: int operate() override { reenter(this) { - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, + bucket_info.bucket, key, + EVENT_NAME_OBJECT_CREATE, + &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); @@ -1242,7 +1258,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { rgw_bucket bucket; rgw_obj_key key; ceph::real_time mtime; - RGWPubSubEventType event_type; + string event_name; EventRef event; TopicsRef topics; public: @@ -1255,12 +1271,12 @@ public: owner(_bucket_info.owner), bucket(_bucket_info.bucket), key(_key), - mtime(_mtime), event_type(_event_type) {} + mtime(_mtime), event_name(get_event_name(_event_type)) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); @@ -1273,7 +1289,7 @@ public: make_event_ref(sync_env->cct, bucket, key, mtime, nullptr, - event_type, &event); + event_name, &event); } yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics)); diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index c9749cc1b1df7..96b8f0509292b 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -637,6 +637,8 @@ class RGWPSCreateNotifOp : public RGWOp { protected: std::unique_ptr ups; string topic_name; + set events; + string bucket_name; RGWBucketInfo bucket_info; @@ -683,7 +685,7 @@ void RGWPSCreateNotifOp::execute() ups = make_unique(store, s->owner.get_id()); auto b = ups->get_bucket(bucket_info.bucket); - op_ret = b->create_notification(topic_name); + op_ret = b->create_notification(topic_name, events); if (op_ret < 0) { ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl; return; @@ -701,6 +703,11 @@ public: ldout(s->cct, 20) << "param 'topic' not provided" << dendl; return -EINVAL; } + + string events_str = s->info.args.get("events", &exists); + if (exists) { + get_str_set(events_str, ",", events); + } return notif_bucket_path(s->object.name, &bucket_name); } }; -- 2.39.5