]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub: filter events
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 1 Aug 2018 00:48:34 +0000 (17:48 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:43 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc

index ff78b9683876833aefb9b8fe2a5f17d397dbf3e3..b028acffab545c8deeefe7c4715f9b5db856fd68 100644 (file)
@@ -2871,6 +2871,7 @@ int main(int argc, const char **argv)
   string sub_dest_bucket;
   string sub_push_endpoint;
   string event_id;
+  set<string, ltstr_nocase> event_types;
 
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
     if (ceph_argparse_double_dash(args, i)) {
@@ -3204,6 +3205,8 @@ int main(int argc, const char **argv)
       sub_push_endpoint = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
       event_id = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
+      get_str_set(val, ",", event_types);
     } else if (strncmp(*i, "-", 1) == 0) {
       cerr << "ERROR: invalid flag " << *i << std::endl;
       return EINVAL;
@@ -7935,7 +7938,7 @@ next:
     }
 
     auto b = ups.get_bucket(bucket_info.bucket);
-    ret = b->create_notification(topic_name);
+    ret = b->create_notification(topic_name, event_types);
     if (ret < 0) {
       cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
index f8e968d4953ab6cb8c2cf5e98b0ead7cc26ff61f..412aea2cf540fe893afc4409956b718de56ef832 100644 (file)
@@ -21,6 +21,12 @@ void rgw_pubsub_topic::dump(Formatter *f) const
   encode_json("name", name, f);
 }
 
+void rgw_pubsub_topic_filter::dump(Formatter *f) const
+{
+  encode_json("topic", topic, f);
+  encode_json("events", events, f);
+}
+
 void rgw_pubsub_topic_subs::dump(Formatter *f) const
 {
   encode_json("topic", topic, f);
@@ -140,7 +146,7 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
 }
 
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name)
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const set<string, ltstr_nocase>& events)
 {
   rgw_pubsub_topic_subs user_topic_info;
   RGWRados *store = ps->store;
@@ -160,7 +166,9 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name)
     return ret;
   }
 
-  bucket_topics.topics[topic_name] = user_topic_info.topic;
+  auto& topic_filter = bucket_topics.topics[topic_name];
+  topic_filter.topic = user_topic_info.topic;
+  topic_filter.events = events;
 
   ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
index 2871f99903eb26636f8ee1a4d33cf078a9714524..9f12cf1f7e06cd4f41f33e9e0b4e043c08dafa5a 100644 (file)
@@ -141,8 +141,30 @@ struct rgw_pubsub_topic_subs {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
 
+struct rgw_pubsub_topic_filter {
+  rgw_pubsub_topic topic;
+  set<string, ltstr_nocase> events;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(topic, bl);
+    encode(events, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(topic, bl);
+    decode(events, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
+
 struct rgw_pubsub_bucket_topics {
-  map<string, rgw_pubsub_topic> topics;
+  map<string, rgw_pubsub_topic_filter> topics;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -234,7 +256,7 @@ public:
     }
 
     int get_topics(rgw_pubsub_bucket_topics *result);
-    int create_notification(const string& topic_name);
+    int create_notification(const string& topic_name, const set<string, ltstr_nocase>& events);
     int remove_notification(const string& topic_name);
   };
 
index cf49792168e8fcb743bb4ec2ddf5379782d51cdb..0e21b60f3341e2cd2f07218b644f6de8b1a20fab 100644 (file)
@@ -307,23 +307,28 @@ struct PSConfig {
 };
 
 enum RGWPubSubEventType {
-  EVENT_UNKNOWN        = 0,
+  UNKNOWN_EVENT        = 0,
   OBJECT_CREATE        = 1,
   OBJECT_DELETE        = 2,
   DELETE_MARKER_CREATE = 3,
 };
 
-static const char *event_name(const RGWPubSubEventType& val)
+#define EVENT_NAME_OBJECT_CREATE               "OBJECT_CREATE"
+#define EVENT_NAME_OBJECT_DELETE               "OBJECT_DELETE"
+#define EVENT_NAME_OBJECT_DELETE_MARKER_CREATE "DELETE_MARKER_CREATE"
+#define EVENT_NAME_UNKNOWN                    "UNKNOWN_EVENT"
+
+static const char *get_event_name(const RGWPubSubEventType& val)
 {
   switch (val) {
     case OBJECT_CREATE:
-      return "OBJECT_CREATE";
+      return EVENT_NAME_OBJECT_CREATE;
     case OBJECT_DELETE:
-      return "OBJECT_DELETE";
+      return EVENT_NAME_OBJECT_DELETE;
     case DELETE_MARKER_CREATE:
-      return "DELETE_MARKER_CREATE";
+      return EVENT_NAME_OBJECT_DELETE_MARKER_CREATE;
     default:
-      return "EVENT_UNKNOWN";
+      return "EVENT_NAME_UNKNOWN";
   };
 }
 
@@ -385,12 +390,12 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
                        const std::vector<std::pair<std::string, std::string> > *attrs,
-                       RGWPubSubEventType event_type,
+                       const string& event_name,
                        EventRef *event) {
   *event = std::make_shared<rgw_pubsub_event>();
 
   EventRef& e = *event;
-  e->event = event_name(event_type);
+  e->event = event_name;
   e->source = bucket.name + "/" + key.name;
   e->timestamp = real_clock::now();
 
@@ -986,6 +991,7 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   rgw_user owner;
   rgw_bucket bucket;
   rgw_obj_key key;
+  string event_name;
 
   RGWUserPubSub ups;
 
@@ -1000,12 +1006,14 @@ public:
                       const rgw_user& _owner,
                       const rgw_bucket& _bucket,
                       const rgw_obj_key& _key,
+                      const string& _event_name,
                       TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
                                                           sync_env(_sync_env),
                                                           env(_env),
                                                           owner(_owner),
                                                           bucket(_bucket),
                                                           key(_key),
+                                                          event_name(_event_name),
                                                           ups(_sync_env->store, owner),
                                                           topics(_topics) {
     *topics = std::make_shared<vector<PSTopicConfigRef> >();
@@ -1042,7 +1050,12 @@ public:
       }
 
       for (auto& titer : bucket_topics.topics) {
-        auto& info = titer.second;
+        auto& topic_filter = titer.second;
+        auto& info = topic_filter.topic;
+        if (!topic_filter.events.empty() &&
+            topic_filter.events.find(event_name) == topic_filter.events.end()) {
+          continue;
+        }
         shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
         tc->name = info.name;
         tc->subs = user_topics.topics[info.name].subs;
@@ -1160,7 +1173,7 @@ public:
         make_event_ref(sync_env->cct,
                        bucket_info.bucket, key,
                        mtime, &attrs,
-                       OBJECT_CREATE, &event);
+                       EVENT_NAME_OBJECT_CREATE, &event);
       }
 
       yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
@@ -1216,7 +1229,10 @@ public:
 
   int operate() override {
     reenter(this) {
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
+                                             bucket_info.bucket, key,
+                                             EVENT_NAME_OBJECT_CREATE,
+                                             &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
@@ -1242,7 +1258,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   rgw_bucket bucket;
   rgw_obj_key key;
   ceph::real_time mtime;
-  RGWPubSubEventType event_type;
+  string event_name;
   EventRef event;
   TopicsRef topics;
 public:
@@ -1255,12 +1271,12 @@ public:
                                                              owner(_bucket_info.owner),
                                                              bucket(_bucket_info.bucket),
                                                              key(_key),
-                                                             mtime(_mtime), event_type(_event_type) {}
+                                                             mtime(_mtime), event_name(get_event_name(_event_type)) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
@@ -1273,7 +1289,7 @@ public:
         make_event_ref(sync_env->cct,
                        bucket, key,
                        mtime, nullptr,
-                       event_type, &event);
+                       event_name, &event);
       }
 
       yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
index c9749cc1b1df71b7ba68b3683ed17330a64334d4..96b8f0509292b646120e5f7defc7ab94d6c8f682 100644 (file)
@@ -637,6 +637,8 @@ class RGWPSCreateNotifOp : public RGWOp {
 protected:
   std::unique_ptr<RGWUserPubSub> ups;
   string topic_name;
+  set<string, ltstr_nocase> events;
+
   string bucket_name;
   RGWBucketInfo bucket_info;
 
@@ -683,7 +685,7 @@ void RGWPSCreateNotifOp::execute()
 
   ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
-  op_ret = b->create_notification(topic_name);
+  op_ret = b->create_notification(topic_name, events);
   if (op_ret < 0) {
     ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
     return;
@@ -701,6 +703,11 @@ public:
       ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
       return -EINVAL;
     }
+
+    string events_str = s->info.args.get("events", &exists);
+    if (exists) {
+      get_str_set(events_str, ",", events);
+    }
     return notif_bucket_path(s->object.name, &bucket_name);
   }
 };