From: Yehuda Sadeh Date: Mon, 2 Jul 2018 23:57:23 +0000 (-0700) Subject: rgw: define pubsub user structures X-Git-Tag: v14.1.0~616^2~53 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a47d07cc03514d7c9044401e5d69b8d12d363260;p=ceph.git rgw: define pubsub user structures Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc new file mode 100644 index 000000000000..e147516483d3 --- /dev/null +++ b/src/rgw/rgw_pubsub.cc @@ -0,0 +1,400 @@ +#include "rgw_rados.h" +#include "rgw_pubsub.h" +#include "rgw_tools.h" + +#define dout_subsys ceph_subsys_rgw + +void encode_json(const char *name, const RGWPubSubEventType& val, Formatter *f) +{ + switch (val) { + case EVENT_UNKNOWN: + encode_json(name, "EVENT_UNKNOWN", f); + break; + case OBJECT_CREATE: + encode_json(name, "OBJECT_CREATE", f); + break; + case OBJECT_DELETE: + encode_json(name, "OBJECT_DELETE", f); + break; + case DELETE_MARKER_CREATE: + encode_json(name, "DELETE_MARKER_CREATE", f); + break; + }; +} + + +void rgw_pubsub_event::dump(Formatter *f) const +{ + encode_json("id", id, f); + { + Formatter::ObjectSection s(*f, "bucket"); + encode_json("name", bucket.name, f); + encode_json("id", bucket.bucket_id, f); + } + { + Formatter::ObjectSection s(*f, "object"); + encode_json("name", key.name, f); + encode_json("version-id", key.instance, f); + } + + utime_t mt(mtime); + encode_json("mtime", mt, f); + encode_json("event", event, f); + utime_t ut(timestamp); + encode_json("timestamp", ut, f); + + { + Formatter::ObjectSection s(*f, "attrs"); + for (auto& attr : attrs) { + encode_json(attr.first.c_str(), attr.second.c_str(), f); + } + } +} + +void rgw_pubsub_user_topic::dump(Formatter *f) const +{ + encode_json("name", name, f); + encode_json("bucket", bucket, f); +} + +void rgw_pubsub_user_topic_info::dump(Formatter *f) const +{ + encode_json("user", user, f); + encode_json("topic", topic, f); +} + +void rgw_pubsub_user_topics::dump(Formatter *f) const +{ + Formatter::ObjectSection s(*f, "topics"); + for (auto& t : topics) { + encode_json(t.first.c_str(), t.second, f); + } +} + +void rgw_pubsub_user_sub_dest::dump(Formatter *f) const +{ + encode_json("bucket_name", bucket_name, f); + encode_json("oid_prefix", oid_prefix, f); + encode_json("push_endpoint", push_endpoint, f); +} + +void rgw_pubsub_user_sub_config::dump(Formatter *f) const +{ + encode_json("user", user, f); + encode_json("name", name, f); + encode_json("topic", topic, f); + encode_json("dest", dest, f); +} + +static string pubsub_user_oid_prefix = "pubsub.user."; + + +class RGWUserPubSub +{ + RGWRados *store; + rgw_user user; + RGWObjectCtx obj_ctx; + + template + int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); + + template + int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); + + int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); +public: + RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), + user(_user), + obj_ctx(store) {} + + string user_meta_oid() const { + return pubsub_user_oid_prefix + user.to_str(); + } + + string bucket_meta_oid(const rgw_bucket& bucket) const { + return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; + } + + string sub_meta_oid(const string& name) const { + return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; + } + + void get_user_meta_obj(rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid()); + } + + void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket)); + } + + void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name)); + } + + int get_topics(rgw_pubsub_user_topics *result); + int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result); + int create_topic(const string& name, const rgw_bucket& bucket); + int remove_topic(const string& name); + int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); + int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); +}; + +template +int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) +{ + bufferlist bl; + int ret = rgw_get_system_obj(store, obj_ctx, + obj.pool, obj.oid, + bl, + objv_tracker, + nullptr, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + auto iter = bl.cbegin(); + try { + decode(*result, iter); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to decode info, caught buffer::error" << dendl; + return -EIO; + } + + return 0; +} + +template +int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) +{ + bufferlist bl; + encode(info, bl); + + int ret = rgw_put_system_obj(store, obj.pool, obj.oid, + bl, false, objv_tracker, + real_time()); + if (ret < 0) { + return ret; + } + + return 0; +} + +int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) +{ + int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker); + if (ret < 0) { + return ret; + } + + return 0; +} + +int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + int ret = read(obj, result, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} + +int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result) +{ + rgw_raw_obj obj; + get_bucket_meta_obj(bucket, &obj); + + RGWObjVersionTracker objv_tracker; + int ret = read(obj, result, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} + + +int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read(obj, &topics, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + + rgw_pubsub_user_topic_info new_topic; + new_topic.user = user; + new_topic.topic.name = name; + new_topic.topic.bucket = bucket; + + topics.topics[name] = new_topic; + + ret = write(obj, topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + + rgw_pubsub_user_topics bucket_topics; + for (auto& t : topics.topics) { + if (t.second.topic.bucket == bucket) { + bucket_topics.topics.insert(t); + } + } + + rgw_raw_obj bobj; + get_bucket_meta_obj(bucket, &bobj); + ret = write(obj, bucket_topics, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} + +int RGWUserPubSub::remove_topic(const string& name) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read(obj, &topics, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + + rgw_bucket bucket; + auto t = topics.topics.find(name); + if (t != topics.topics.end()) { + bucket = t->second.topic.bucket; + } + + topics.topics.erase(name); + + ret = write(obj, topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + + if (bucket.name.empty()) { + return 0; + } + + rgw_pubsub_user_topics bucket_topics; + for (auto& t : topics.topics) { + if (t.second.topic.bucket == bucket) { + bucket_topics.topics.insert(t); + } + } + + rgw_raw_obj bobj; + get_bucket_meta_obj(bucket, &bobj); + ret = write(obj, bucket_topics, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} + +int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read(obj, &topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + + auto iter = topics.topics.find(name); + if (iter == topics.topics.end()) { + ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + return -ENOENT; + } + + auto& t = iter->second; + + rgw_pubsub_user_sub_config sub_conf; + + sub_conf.user = user; + sub_conf.name = name; + sub_conf.topic = topic; + sub_conf.dest = dest; + + t.subs.insert(name); + + ret = write(obj, topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + + rgw_raw_obj bobj; + get_sub_meta_obj(name, &bobj); + ret = write(obj, sub_conf, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; + return ret; + } + return 0; +} + +int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read(obj, &topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + } + + if (ret >= 0) { + auto iter = topics.topics.find(name); + if (iter == topics.topics.end()) { + ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + } else { + auto& t = iter->second; + + t.subs.erase(name); + + ret = write(obj, topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + return ret; + } + } + } + + rgw_raw_obj sobj; + get_sub_meta_obj(name, &sobj); + ret = remove(obj, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; + return ret; + } + return 0; +} + diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h new file mode 100644 index 000000000000..2239ab2f9c6c --- /dev/null +++ b/src/rgw/rgw_pubsub.h @@ -0,0 +1,186 @@ +#ifndef CEPH_RGW_PUBSUB_H +#define CEPH_RGW_PUBSUB_H + +#include "rgw_common.h" + + +enum RGWPubSubEventType { + EVENT_UNKNOWN = 0, + OBJECT_CREATE = 1, + OBJECT_DELETE = 2, + DELETE_MARKER_CREATE = 3, +}; + +struct rgw_pubsub_event { + string id; + rgw_bucket bucket; + rgw_obj_key key; + ceph::real_time mtime; + + RGWPubSubEventType event; + ceph::real_time timestamp; + + std::vector > attrs; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(bucket, bl); + encode(key, bl); + encode(mtime, bl); + uint32_t e = (uint32_t)event; + encode(e, bl); + encode(timestamp, bl); + encode(attrs, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(bucket, bl); + decode(key, bl); + decode(mtime, bl); + uint32_t e; + decode(e, bl); + event = (RGWPubSubEventType)e; + decode(timestamp, bl); + decode(attrs, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_event) + +struct rgw_pubsub_user_sub_dest { + string bucket_name; + string oid_prefix; + string push_endpoint; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bucket_name, bl); + encode(oid_prefix, bl); + encode(push_endpoint, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(bucket_name, bl); + decode(oid_prefix, bl); + decode(push_endpoint, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_dest) + +struct rgw_pubsub_user_sub_config { + rgw_user user; + string name; + string topic; + rgw_pubsub_user_sub_dest dest; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(user, bl); + encode(name, bl); + encode(topic, bl); + encode(dest, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(user, bl); + decode(name, bl); + decode(topic, bl); + decode(dest, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_config) + +struct rgw_pubsub_user_topic { + string name; + rgw_bucket bucket; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(name, bl); + encode(bucket, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(name, bl); + decode(bucket, bl); + DECODE_FINISH(bl); + } + + const string& to_str() const { + return name; + } + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_user_topic) + +struct rgw_pubsub_user_topic_info { + rgw_user user; + rgw_pubsub_user_topic topic; + set subs; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(user, bl); + encode(topic, bl); + encode(subs, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(user, bl); + decode(topic, bl); + decode(subs, bl); + DECODE_FINISH(bl); + } + + string to_str() const { + return user.to_str() + "/" + topic.name; + } + + void dump(Formatter *f) const; + + bool operator<(const rgw_pubsub_user_topic& t) const { + return to_str().compare(t.to_str()); + } +}; +WRITE_CLASS_ENCODER(rgw_pubsub_user_topic_info) + +struct rgw_pubsub_user_topics { + map topics; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(topics, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(topics, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) + +#endif