string sub_dest_bucket;
string sub_push_endpoint;
string event_id;
+ set<string, ltstr_nocase> event_types;
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
sub_push_endpoint = val;
} else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
event_id = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
+ get_str_set(val, ",", event_types);
} else if (strncmp(*i, "-", 1) == 0) {
cerr << "ERROR: invalid flag " << *i << std::endl;
return EINVAL;
}
auto b = ups.get_bucket(bucket_info.bucket);
- ret = b->create_notification(topic_name);
+ ret = b->create_notification(topic_name, event_types);
if (ret < 0) {
cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
encode_json("name", name, f);
}
+void rgw_pubsub_topic_filter::dump(Formatter *f) const
+{
+ encode_json("topic", topic, f);
+ encode_json("events", events, f);
+}
+
void rgw_pubsub_topic_subs::dump(Formatter *f) const
{
encode_json("topic", topic, f);
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name)
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const set<string, ltstr_nocase>& events)
{
rgw_pubsub_topic_subs user_topic_info;
RGWRados *store = ps->store;
return ret;
}
- bucket_topics.topics[topic_name] = user_topic_info.topic;
+ auto& topic_filter = bucket_topics.topics[topic_name];
+ topic_filter.topic = user_topic_info.topic;
+ topic_filter.events = events;
ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
+struct rgw_pubsub_topic_filter {
+ rgw_pubsub_topic topic;
+ set<string, ltstr_nocase> events;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(topic, bl);
+ encode(events, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(topic, bl);
+ decode(events, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
+
struct rgw_pubsub_bucket_topics {
- map<string, rgw_pubsub_topic> topics;
+ map<string, rgw_pubsub_topic_filter> topics;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
}
int get_topics(rgw_pubsub_bucket_topics *result);
- int create_notification(const string& topic_name);
+ int create_notification(const string& topic_name, const set<string, ltstr_nocase>& events);
int remove_notification(const string& topic_name);
};
};
enum RGWPubSubEventType {
- EVENT_UNKNOWN = 0,
+ UNKNOWN_EVENT = 0,
OBJECT_CREATE = 1,
OBJECT_DELETE = 2,
DELETE_MARKER_CREATE = 3,
};
-static const char *event_name(const RGWPubSubEventType& val)
+#define EVENT_NAME_OBJECT_CREATE "OBJECT_CREATE"
+#define EVENT_NAME_OBJECT_DELETE "OBJECT_DELETE"
+#define EVENT_NAME_OBJECT_DELETE_MARKER_CREATE "DELETE_MARKER_CREATE"
+#define EVENT_NAME_UNKNOWN "UNKNOWN_EVENT"
+
+static const char *get_event_name(const RGWPubSubEventType& val)
{
switch (val) {
case OBJECT_CREATE:
- return "OBJECT_CREATE";
+ return EVENT_NAME_OBJECT_CREATE;
case OBJECT_DELETE:
- return "OBJECT_DELETE";
+ return EVENT_NAME_OBJECT_DELETE;
case DELETE_MARKER_CREATE:
- return "DELETE_MARKER_CREATE";
+ return EVENT_NAME_OBJECT_DELETE_MARKER_CREATE;
default:
- return "EVENT_UNKNOWN";
+ return "EVENT_NAME_UNKNOWN";
};
}
const rgw_obj_key& key,
const ceph::real_time& mtime,
const std::vector<std::pair<std::string, std::string> > *attrs,
- RGWPubSubEventType event_type,
+ const string& event_name,
EventRef *event) {
*event = std::make_shared<rgw_pubsub_event>();
EventRef& e = *event;
- e->event = event_name(event_type);
+ e->event = event_name;
e->source = bucket.name + "/" + key.name;
e->timestamp = real_clock::now();
rgw_user owner;
rgw_bucket bucket;
rgw_obj_key key;
+ string event_name;
RGWUserPubSub ups;
const rgw_user& _owner,
const rgw_bucket& _bucket,
const rgw_obj_key& _key,
+ const string& _event_name,
TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
owner(_owner),
bucket(_bucket),
key(_key),
+ event_name(_event_name),
ups(_sync_env->store, owner),
topics(_topics) {
*topics = std::make_shared<vector<PSTopicConfigRef> >();
}
for (auto& titer : bucket_topics.topics) {
- auto& info = titer.second;
+ auto& topic_filter = titer.second;
+ auto& info = topic_filter.topic;
+ if (!topic_filter.events.empty() &&
+ topic_filter.events.find(event_name) == topic_filter.events.end()) {
+ continue;
+ }
shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
tc->name = info.name;
tc->subs = user_topics.topics[info.name].subs;
make_event_ref(sync_env->cct,
bucket_info.bucket, key,
mtime, &attrs,
- OBJECT_CREATE, &event);
+ EVENT_NAME_OBJECT_CREATE, &event);
}
yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
int operate() override {
reenter(this) {
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
+ bucket_info.bucket, key,
+ EVENT_NAME_OBJECT_CREATE,
+ &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
rgw_bucket bucket;
rgw_obj_key key;
ceph::real_time mtime;
- RGWPubSubEventType event_type;
+ string event_name;
EventRef event;
TopicsRef topics;
public:
owner(_bucket_info.owner),
bucket(_bucket_info.bucket),
key(_key),
- mtime(_mtime), event_type(_event_type) {}
+ mtime(_mtime), event_name(get_event_name(_event_type)) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
make_event_ref(sync_env->cct,
bucket, key,
mtime, nullptr,
- event_type, &event);
+ event_name, &event);
}
yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
protected:
std::unique_ptr<RGWUserPubSub> ups;
string topic_name;
+ set<string, ltstr_nocase> events;
+
string bucket_name;
RGWBucketInfo bucket_info;
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
- op_ret = b->create_notification(topic_name);
+ op_ret = b->create_notification(topic_name, events);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
return;
ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
return -EINVAL;
}
+
+ string events_str = s->info.args.get("events", &exists);
+ if (exists) {
+ get_str_set(events_str, ",", events);
+ }
return notif_bucket_path(s->object.name, &bucket_name);
}
};