]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial pub sub work
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 26 Jun 2018 23:02:14 +0000 (16:02 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:41 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_sync_module_pubsub.cc [new file with mode: 0644]
src/rgw/rgw_sync_module_pubsub.h [new file with mode: 0644]

index 502ab0891acb35d182578a2cff93ad3b6f1aac94..1072631f072d9f92aa686fc82944270e43a62b40 100644 (file)
@@ -77,6 +77,7 @@ set(librgw_common_srcs
   rgw_metadata.cc
   rgw_multi.cc
   rgw_multi_del.cc
+  rgw_pubsub.cc
   rgw_sync.cc
   rgw_data_sync.cc
   rgw_sync_module.cc
@@ -84,6 +85,7 @@ set(librgw_common_srcs
   rgw_sync_module_es.cc
   rgw_sync_module_es_rest.cc
   rgw_sync_module_log.cc
+  rgw_sync_module_pubsub.cc
   rgw_sync_log_trim.cc
   rgw_sync_trace.cc
   rgw_period_history.cc
diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc
new file mode 100644 (file)
index 0000000..49cd9da
--- /dev/null
@@ -0,0 +1,387 @@
+#include "rgw_common.h"
+#include "rgw_coroutine.h"
+#include "rgw_sync_module.h"
+#include "rgw_data_sync.h"
+#include "rgw_sync_module_pubsub.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_tools.h"
+#include "rgw_op.h"
+#include "rgw_pubsub.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+
+/*
+
+config:
+
+{
+   "tenant": <tenant>,             # default: <empty>
+   "uid": <uid>,                   # default: "pubsub"
+   "data_bucket_prefix": <prefix>  # default: "pubsub-"
+   "data_oid_prefix": <prefix>     #
+
+    # non-dynamic config
+    "notifications": [
+        {
+            "path": <notification-path>,    # this can be either an explicit path: <bucket>, or <bucket>/<object>,
+                                            # or a prefix if it ends with a wildcard
+            "topic": <topic-name>
+         },
+        ...
+    ],
+    "subscriptions": [
+        {
+            "name": <subscription-name>,
+            "topic": <topic>,
+            "push_endpoint": <endpoint>,
+            "data_bucket": <bucket>,       # override name of bucket where subscription data will be store
+            "data_oid_prefix": <prefix>    # set prefix for subscription data object ids
+        },
+        ...
+    ]
+}
+
+*/
+
+struct PSSubConfig { /* subscription config */
+  string name;
+  string topic;
+  string push_endpoint;
+
+  void init(CephContext *cct, const JSONFormattable& config) {
+    name = config["name"];
+    topic = config["topic"];
+    push_endpoint = config["push_endpoint"];
+  }
+};
+
+struct PSTopicConfig {
+  string name;
+};
+
+struct PSNotificationConfig {
+  string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
+  string topic;
+
+  uint64_t id{0};
+  bool is_prefix{false};
+
+  void init(CephContext *cct, const JSONFormattable& config) {
+    path = config["path"];
+    if (!path.empty() && path[path.size() - 1] == '*') {
+      path = path.substr(0, path.size() - 1);
+      is_prefix = true;
+    }
+    topic = config["topic"];
+  }
+};
+
+template<class T>
+static string json_str(const char *name, const T& obj, bool pretty = false)
+{
+  stringstream ss;
+  JSONFormatter f(pretty);
+
+  encode_json(name, obj, &f);
+  f.flush(ss);
+
+  return ss.str();
+}
+
+
+struct PSConfig {
+  string id{"pubsub"};
+  string ps_uid{"pubsub"};
+  uint64_t sync_instance{0};
+  uint32_t num_pub_shards{0};
+  uint32_t num_topic_shards{0};
+  uint64_t max_id{0};
+
+  /* FIXME: no hard coded buckets, we'll have configurable topics */
+  vector<PSSubConfig> subscriptions;
+  map<string, PSTopicConfig> topics;
+  multimap<string, PSNotificationConfig> notifications;
+
+  void init(CephContext *cct, const JSONFormattable& config) {
+    ps_uid = config["pubsub"];
+    num_pub_shards = config["num_pub_shards"](PS_NUM_PUB_SHARDS_DEFAULT);
+    if (num_pub_shards < PS_NUM_PUB_SHARDS_MIN) {
+      num_pub_shards = PS_NUM_PUB_SHARDS_MIN;
+    }
+
+    num_topic_shards = config["num_topic_shards"](PS_NUM_TOPIC_SHARDS_DEFAULT);
+    if (num_topic_shards < PS_NUM_TOPIC_SHARDS_MIN) {
+      num_topic_shards = PS_NUM_TOPIC_SHARDS_MIN;
+    }
+    /* FIXME: this will be dynamically configured */
+    for (auto& c : config["notifications"].array()) {
+      PSNotificationConfig nc;
+      nc.id = ++max_id;
+      nc.init(cct, c);
+      notifications.insert(std::make_pair(nc.path, nc));
+
+      PSTopicConfig topic_config = { .name = nc.topic };
+      topics[nc.topic] = topic_config;
+    }
+    for (auto& c : config["subscriptions"].array()) {
+      PSSubConfig sc;
+      sc.init(cct, c);
+      subscriptions.push_back(sc);
+    }
+
+    ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
+  }
+
+  void init_instance(RGWRealm& realm, uint64_t instance_id) {
+    sync_instance = instance_id;
+  }
+
+  void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSNotificationConfig *> *notifs) {
+    string path = bucket_info.bucket.name + "/" + key.name;
+
+    notifs->clear();
+
+    auto iter = notifications.upper_bound(path);
+    if (iter == notifications.begin()) {
+      return;
+    }
+
+    do {
+      --iter;
+      if (iter->first.size() > path.size()) {
+        break;
+      }
+      if (path.compare(0, iter->first.size(), iter->first) != 0) {
+        break;
+      }
+
+      PSNotificationConfig *target = &iter->second;
+
+      if (!target->is_prefix &&
+          path.size() != iter->first.size()) {
+        continue;
+      }
+
+      notifs->push_back(target);
+    } while (iter != notifications.begin());
+  }
+};
+
+using PSConfigRef = std::shared_ptr<PSConfig>;
+
+class RGWPSInitConfigCBCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  PSConfigRef conf;
+
+  rgw_user_create_params create_user;
+public:
+  RGWPSInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+                          PSConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+                                                    sync_env(_sync_env),
+                                                    conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
+
+      /* nothing to do here right now */
+      create_user.uid = conf->ps_uid;
+      create_user.max_buckets = 0; /* unlimited */
+      create_user.display_name = "pubsub";
+      create_user.generate_key = false;
+      yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user));
+      if (retcode < 0) {
+        ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  PSConfigRef conf;
+  uint64_t versioned_epoch;
+  vector<PSNotificationConfig *> notifs;
+  vector<PSNotificationConfig *>::iterator niter;
+public:
+  RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                          PSConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+                                                                               versioned_epoch(_versioned_epoch) {
+#warning this will need to change obviously
+    conf->get_notifs(_bucket_info, _key, &notifs);
+  }
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
+                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+                               << " attrs=" << attrs << dendl;
+
+
+      for (niter = notifs.begin(); niter != notifs.end(); ++niter) {
+        yield {
+          ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl;
+
+#warning publish notification
+#if 0
+        string path = conf->get_obj_path(bucket_info, key);
+        es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+
+        call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
+                                                            sync_env->http_manager,
+                                                            path, nullptr /* params */,
+                                                            doc, nullptr /* result */));
+#endif
+        }
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  PSConfigRef conf;
+  uint64_t versioned_epoch;
+public:
+  RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
+                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                        PSConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                                                           conf(_conf), versioned_epoch(_versioned_epoch) {
+  }
+
+  ~RGWPSHandleRemoteObjCR() override {}
+
+  RGWStatRemoteObjCBCR *allocate_callback() override {
+#warning things need to change
+    /* FIXME: we need to create a pre_callback coroutine that decides whether object should
+     * actually be handled. Otherwise we fetch info from remote zone about every object, even
+     * if we don't intend to handle it.
+     */
+    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+  }
+};
+
+class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWBucketInfo bucket_info;
+  rgw_obj_key key;
+  ceph::real_time mtime;
+  PSConfigRef conf;
+public:
+  RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                          PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+                                                        bucket_info(_bucket_info), key(_key),
+                                                        mtime(_mtime), conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
+                               << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+      yield {
+#if 0
+        string path = conf->get_obj_path(bucket_info, key);
+
+        call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
+                                         sync_env->http_manager,
+                                         path, nullptr /* params */));
+#endif
+      }
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+
+};
+
+class RGWPSDataSyncModule : public RGWDataSyncModule {
+  PSConfigRef conf;
+public:
+  RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<PSConfig>()) {
+    conf->init(cct, config);
+  }
+  ~RGWPSDataSyncModule() override {}
+
+  void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
+    conf->init_instance(sync_env->store->get_realm(), instance_id);
+  }
+
+  RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
+    ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
+    return new RGWPSInitConfigCBCR(sync_env, conf);
+  }
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+#warning this should be done correctly
+#if 0
+    if (!conf->should_handle_operation(bucket_info)) {
+      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+      return nullptr;
+    }
+#endif
+    return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch);
+  }
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+    /* versioned and versioned epoch params are useless in the elasticsearch backend case */
+    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+#warning this should be done correctly
+#if 0
+    if (!conf->should_handle_operation(bucket_info)) {
+      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+      return nullptr;
+    }
+#endif
+    return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+  }
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                     rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+                            << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+#warning requests should be filtered correctly
+#if 0
+    ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
+#endif
+#warning delete markers need to be handled too
+    return NULL;
+  }
+};
+
+RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
+{
+  data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
+}
+
+RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
+{
+  return data_handler.get();
+}
+
+RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
+#warning REST filter implementation missing
+#if 0
+  if (dialect != RGW_REST_S3) {
+    return orig;
+  }
+  delete orig;
+  return new RGWRESTMgr_MDSearch_S3();
+#endif
+  return orig;
+}
+
+int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+  instance->reset(new RGWPSSyncModuleInstance(cct, config));
+  return 0;
+}
+
diff --git a/src/rgw/rgw_sync_module_pubsub.h b/src/rgw/rgw_sync_module_pubsub.h
new file mode 100644 (file)
index 0000000..a7e91cf
--- /dev/null
@@ -0,0 +1,26 @@
+#ifndef CEPH_RGW_SYNC_MODULE_PUBSUB_H
+#define CEPH_RGW_SYNC_MODULE_PUBSUB_H
+
+#include "rgw_sync_module.h"
+
+class RGWPSSyncModule : public RGWSyncModule {
+public:
+  RGWPSSyncModule() {}
+  bool supports_data_export() override {
+    return false;
+  }
+  int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+};
+
+class RGWPSDataSyncModule;
+class RGWRESTConn;
+
+class RGWPSSyncModuleInstance : public RGWSyncModuleInstance {
+  std::unique_ptr<RGWPSDataSyncModule> data_handler;
+public:
+  RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config);
+  RGWDataSyncModule *get_data_handler() override;
+  RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
+};
+
+#endif