]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: define pubsub user structures
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 2 Jul 2018 23:57:23 +0000 (16:57 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_pubsub.cc [new file with mode: 0644]
src/rgw/rgw_pubsub.h [new file with mode: 0644]

diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc
new file mode 100644 (file)
index 0000000..e147516
--- /dev/null
@@ -0,0 +1,400 @@
+#include "rgw_rados.h"
+#include "rgw_pubsub.h"
+#include "rgw_tools.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+void encode_json(const char *name, const RGWPubSubEventType& val, Formatter *f)
+{
+  switch (val) {
+    case EVENT_UNKNOWN:
+      encode_json(name, "EVENT_UNKNOWN", f);
+      break;
+    case OBJECT_CREATE:
+      encode_json(name, "OBJECT_CREATE", f);
+      break;
+    case OBJECT_DELETE:
+      encode_json(name, "OBJECT_DELETE", f);
+      break;
+    case DELETE_MARKER_CREATE:
+      encode_json(name, "DELETE_MARKER_CREATE", f);
+      break;
+  };
+}
+
+
+void rgw_pubsub_event::dump(Formatter *f) const
+{
+  encode_json("id", id, f);
+  {
+    Formatter::ObjectSection s(*f, "bucket");
+    encode_json("name", bucket.name, f);
+    encode_json("id", bucket.bucket_id, f);
+  }
+  {
+    Formatter::ObjectSection s(*f, "object");
+    encode_json("name", key.name, f);
+    encode_json("version-id", key.instance, f);
+  }
+
+  utime_t mt(mtime);
+  encode_json("mtime", mt, f);
+  encode_json("event", event, f);
+  utime_t ut(timestamp);
+  encode_json("timestamp", ut, f);
+
+  {
+    Formatter::ObjectSection s(*f, "attrs");
+    for (auto& attr : attrs) {
+      encode_json(attr.first.c_str(), attr.second.c_str(), f);
+    }
+  }
+}
+
+void rgw_pubsub_user_topic::dump(Formatter *f) const
+{
+  encode_json("name", name, f);
+  encode_json("bucket", bucket, f);
+}
+
+void rgw_pubsub_user_topic_info::dump(Formatter *f) const
+{
+  encode_json("user", user, f);
+  encode_json("topic", topic, f);
+}
+
+void rgw_pubsub_user_topics::dump(Formatter *f) const
+{
+  Formatter::ObjectSection s(*f, "topics");
+  for (auto& t : topics) {
+    encode_json(t.first.c_str(), t.second, f);
+  }
+}
+
+void rgw_pubsub_user_sub_dest::dump(Formatter *f) const
+{
+  encode_json("bucket_name", bucket_name, f);
+  encode_json("oid_prefix", oid_prefix, f);
+  encode_json("push_endpoint", push_endpoint, f);
+}
+
+void rgw_pubsub_user_sub_config::dump(Formatter *f) const
+{
+  encode_json("user", user, f);
+  encode_json("name", name, f);
+  encode_json("topic", topic, f);
+  encode_json("dest", dest, f);
+}
+
+static string pubsub_user_oid_prefix = "pubsub.user.";
+
+
+class RGWUserPubSub
+{
+  RGWRados *store;
+  rgw_user user;
+  RGWObjectCtx obj_ctx;
+
+  template <class T>
+  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+
+  template <class T>
+  int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
+
+  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+public:
+  RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
+                                                           user(_user),
+                                                           obj_ctx(store) {}
+
+  string user_meta_oid() const {
+    return pubsub_user_oid_prefix + user.to_str();
+  }
+
+  string bucket_meta_oid(const rgw_bucket& bucket) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+  }
+
+  string sub_meta_oid(const string& name) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+  }
+
+  void get_user_meta_obj(rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid());
+  }
+
+  void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket));
+  }
+
+  void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
+  }
+
+  int get_topics(rgw_pubsub_user_topics *result);
+  int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
+  int create_topic(const string& name, const rgw_bucket& bucket);
+  int remove_topic(const string& name);
+  int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+  int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+};
+
+template <class T>
+int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+{
+  bufferlist bl;
+  int ret = rgw_get_system_obj(store, obj_ctx,
+                               obj.pool, obj.oid,
+                               bl,
+                               objv_tracker,
+                               nullptr, nullptr, nullptr);
+  if (ret < 0) {
+    return ret;
+  }
+
+  auto iter = bl.cbegin();
+  try {
+    decode(*result, iter);
+  } catch (buffer::error& err) {
+    ldout(store->ctx(), 0) << "ERROR: failed to decode info, caught buffer::error" << dendl;
+    return -EIO;
+  }
+
+  return 0;
+}
+
+template <class T>
+int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
+{
+  bufferlist bl;
+  encode(info, bl);
+
+  int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
+                           bl, false, objv_tracker,
+                           real_time());
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
+{
+  int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  int ret = read(obj, result, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result)
+{
+  rgw_raw_obj obj;
+  get_bucket_meta_obj(bucket, &obj);
+
+  RGWObjVersionTracker objv_tracker;
+  int ret = read(obj, result, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+
+int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read(obj, &topics, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_pubsub_user_topic_info new_topic;
+  new_topic.user = user;
+  new_topic.topic.name = name;
+  new_topic.topic.bucket = bucket;
+
+  topics.topics[name] = new_topic;
+
+  ret = write(obj, topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_pubsub_user_topics bucket_topics;
+  for (auto& t : topics.topics) {
+    if (t.second.topic.bucket == bucket) {
+      bucket_topics.topics.insert(t);
+    }
+  }
+
+  rgw_raw_obj bobj;
+  get_bucket_meta_obj(bucket, &bobj);
+  ret = write(obj, bucket_topics, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWUserPubSub::remove_topic(const string& name)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read(obj, &topics, &objv_tracker);
+  if (ret < 0 && ret != -ENOENT) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_bucket bucket;
+  auto t = topics.topics.find(name);
+  if (t != topics.topics.end()) {
+    bucket = t->second.topic.bucket;
+  }
+
+  topics.topics.erase(name);
+
+  ret = write(obj, topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  if (bucket.name.empty()) {
+    return 0;
+  }
+
+  rgw_pubsub_user_topics bucket_topics;
+  for (auto& t : topics.topics) {
+    if (t.second.topic.bucket == bucket) {
+      bucket_topics.topics.insert(t);
+    }
+  }
+
+  rgw_raw_obj bobj;
+  get_bucket_meta_obj(bucket, &bobj);
+  ret = write(obj, bucket_topics, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read(obj, &topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  auto iter = topics.topics.find(name);
+  if (iter == topics.topics.end()) {
+    ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+    return -ENOENT;
+  }
+
+  auto& t = iter->second;
+
+  rgw_pubsub_user_sub_config sub_conf;
+
+  sub_conf.user = user;
+  sub_conf.name = name;
+  sub_conf.topic = topic;
+  sub_conf.dest = dest;
+
+  t.subs.insert(name);
+
+  ret = write(obj, topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_raw_obj bobj;
+  get_sub_meta_obj(name, &bobj);
+  ret = write(obj, sub_conf, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read(obj, &topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+  }
+
+  if (ret >= 0) {
+    auto iter = topics.topics.find(name);
+    if (iter == topics.topics.end()) {
+      ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+    } else {
+      auto& t = iter->second;
+
+      t.subs.erase(name);
+
+      ret = write(obj, topics, &objv_tracker);
+      if (ret < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+        return ret;
+      }
+    }
+  }
+
+  rgw_raw_obj sobj;
+  get_sub_meta_obj(name, &sobj);
+  ret = remove(obj, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h
new file mode 100644 (file)
index 0000000..2239ab2
--- /dev/null
@@ -0,0 +1,186 @@
+#ifndef CEPH_RGW_PUBSUB_H
+#define CEPH_RGW_PUBSUB_H
+
+#include "rgw_common.h"
+
+
+enum RGWPubSubEventType {
+  EVENT_UNKNOWN        = 0,
+  OBJECT_CREATE        = 1,
+  OBJECT_DELETE        = 2,
+  DELETE_MARKER_CREATE = 3,
+};
+
+struct rgw_pubsub_event {
+  string id;
+  rgw_bucket bucket;
+  rgw_obj_key key;
+  ceph::real_time mtime;
+
+  RGWPubSubEventType event;
+  ceph::real_time timestamp;
+
+  std::vector<std::pair<std::string, std::string> > attrs;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(bucket, bl);
+    encode(key, bl);
+    encode(mtime, bl);
+    uint32_t e = (uint32_t)event;
+    encode(e, bl);
+    encode(timestamp, bl);
+    encode(attrs, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(bucket, bl);
+    decode(key, bl);
+    decode(mtime, bl);
+    uint32_t e;
+    decode(e, bl);
+    event = (RGWPubSubEventType)e;
+    decode(timestamp, bl);
+    decode(attrs, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_event)
+
+struct rgw_pubsub_user_sub_dest {
+  string bucket_name;
+  string oid_prefix;
+  string push_endpoint;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bucket_name, bl);
+    encode(oid_prefix, bl);
+    encode(push_endpoint, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(bucket_name, bl);
+    decode(oid_prefix, bl);
+    decode(push_endpoint, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_dest)
+
+struct rgw_pubsub_user_sub_config {
+  rgw_user user;
+  string name;
+  string topic;
+  rgw_pubsub_user_sub_dest dest;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(user, bl);
+    encode(name, bl);
+    encode(topic, bl);
+    encode(dest, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(user, bl);
+    decode(name, bl);
+    decode(topic, bl);
+    decode(dest, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_config)
+
+struct rgw_pubsub_user_topic {
+  string name;
+  rgw_bucket bucket;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(name, bl);
+    encode(bucket, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(name, bl);
+    decode(bucket, bl);
+    DECODE_FINISH(bl);
+  }
+
+  const string& to_str() const {
+    return name;
+  }
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_user_topic)
+
+struct rgw_pubsub_user_topic_info {
+  rgw_user user;
+  rgw_pubsub_user_topic topic;
+  set<string> subs;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(user, bl);
+    encode(topic, bl);
+    encode(subs, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(user, bl);
+    decode(topic, bl);
+    decode(subs, bl);
+    DECODE_FINISH(bl);
+  }
+
+  string to_str() const {
+    return user.to_str() + "/" + topic.name;
+  }
+
+  void dump(Formatter *f) const;
+
+  bool operator<(const rgw_pubsub_user_topic& t) const {
+    return to_str().compare(t.to_str());
+  }
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_user_topic_info)
+
+struct rgw_pubsub_user_topics {
+  map<string, rgw_pubsub_user_topic_info> topics;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(topics, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(topics, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+
+#endif