]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub: subscription init and more
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 29 Jun 2018 04:10:40 +0000 (21:10 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.h
src/rgw/rgw_cr_tools.cc
src/rgw/rgw_cr_tools.h
src/rgw/rgw_sync_module_pubsub.cc

index 857961998924d1df5abff5a0670929b60027acd1..516cf8d6044a661f4a3e942e488016e077450e6a 100644 (file)
@@ -204,7 +204,8 @@ class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
     req = new Request(this,
                       stack->create_completion_notifier(),
                       store,
-                      params);
+                      params,
+                      result);
 
     async_rados->queue(req);
     return 0;
index 6176cc4df55f6f8f26825f8b54d50abc7d59ecb8..d7dbc6d7bb9f8c2b201170ae0399fc80ead681bc 100644 (file)
@@ -16,11 +16,9 @@ int RGWUserCreateCR::Request::_send_request()
 
   RGWUserAdminOpState op_state;
 
-  rgw_user uid(params.uid);
+  auto& user = params.user;
 
-  uid.tenant = params.tenant_name;
-
-  op_state.set_user_id(uid);
+  op_state.set_user_id(user);
   op_state.set_display_name(params.display_name);
   op_state.set_user_email(params.email);
   op_state.set_caps(params.caps);
@@ -86,8 +84,15 @@ int RGWUserCreateCR::Request::_send_request()
 template<>
 int RGWGetUserInfoCR::Request::_send_request()
 {
-  rgw_user user(params.tenant, params.uid);
-  return rgw_get_user_info_by_uid(store, user, result->user_info);
+  return rgw_get_user_info_by_uid(store, params.user, *result);
+}
+
+template<>
+int RGWGetBucketInfoCR::Request::_send_request()
+{
+  RGWObjectCtx obj_ctx(store);
+  return store->get_bucket_info(obj_ctx, params.tenant, params.bucket_name,
+                                result->bucket_info, &result->mtime, &result->attrs);
 }
 
 template<>
index 8ac6626362804976f982b717ad016df1c4ce7fa4..ab69d4c0e0482a72dadc3c7175a9ab11d7fd3fd7 100644 (file)
@@ -5,14 +5,13 @@
 
 
 struct rgw_user_create_params {
-  std::string uid;
+  rgw_user user;
   std::string display_name;
   std::string email;
   std::string access_key;
   std::string secret_key;
   std::string key_type; /* "swift" or "s3" */
   std::string caps;
-  std::string tenant_name;
 
   bool generate_key{true};
   bool suspended{false};
@@ -24,17 +23,24 @@ struct rgw_user_create_params {
 
 using RGWUserCreateCR = RGWSimpleWriteOnlyAsyncCR<rgw_user_create_params>;
 
-
 struct rgw_get_user_info_params {
-  std::string uid;
-  std::string tenant;
+  rgw_user user;
+};
+
+using RGWGetUserInfoCR = RGWSimpleAsyncCR<rgw_get_user_info_params, RGWUserInfo>;
+
+struct rgw_get_bucket_info_params {
+  string tenant;
+  string bucket_name;
 };
 
-struct rgw_get_user_info_result {
-  RGWUserInfo user_info;
+struct rgw_get_bucket_info_result {
+  ceph::real_time mtime;
+  RGWBucketInfo bucket_info;
+  map<string, bufferlist> attrs;
 };
 
-using RGWGetUserInfoCR = RGWSimpleAsyncCR<rgw_get_user_info_params, rgw_get_user_info_result>;
+using RGWGetBucketInfoCR = RGWSimpleAsyncCR<rgw_get_bucket_info_params, rgw_get_bucket_info_result>;
 
 struct rgw_bucket_create_local_params {
   shared_ptr<RGWUserInfo> user_info;
index 49cd9da93855968eb2778c6c507f696720c1a6d4..abcf83a894c427de843d664b81ec8308e2c12f5e 100644 (file)
@@ -52,15 +52,33 @@ struct PSSubConfig { /* subscription config */
   string topic;
   string push_endpoint;
 
-  void init(CephContext *cct, const JSONFormattable& config) {
+  string data_bucket_name;
+  string data_prefix;
+
+  void dump(Formatter *f) const {
+    encode_json("name", name, f);
+    encode_json("topic", topic, f);
+    encode_json("push_endpoint", push_endpoint, f);
+    encode_json("data_bucket_name", data_bucket_name, f);
+    encode_json("data_oid_prefix", data_oid_prefix, f);
+  }
+
+  void init(CephContext *cct, const JSONFormattable& config,
+            const string& data_bucket_prefix) {
     name = config["name"];
     topic = config["topic"];
     push_endpoint = config["push_endpoint"];
+    string default_bucket_name = data_prefix + name;
+    data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
+    data_prefix = config["data_prefix"];
   }
 };
 
+using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
+
 struct PSTopicConfig {
   string name;
+  set<string> subs;
 };
 
 struct PSNotificationConfig {
@@ -95,19 +113,25 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
 
 struct PSConfig {
   string id{"pubsub"};
-  string ps_uid{"pubsub"};
+  rgw_user user;
+  string data_bucket_prefix;
+
   uint64_t sync_instance{0};
   uint32_t num_pub_shards{0};
   uint32_t num_topic_shards{0};
   uint64_t max_id{0};
 
+
   /* FIXME: no hard coded buckets, we'll have configurable topics */
-  vector<PSSubConfig> subscriptions;
+  map<string, PSSubConfigRef> subs;
   map<string, PSTopicConfig> topics;
   multimap<string, PSNotificationConfig> notifications;
 
   void init(CephContext *cct, const JSONFormattable& config) {
-    ps_uid = config["pubsub"];
+    string uid = config["uid"]("pubsub");
+    user = rgw_user(config["tenant"], uid);
+    data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+
     num_pub_shards = config["num_pub_shards"](PS_NUM_PUB_SHARDS_DEFAULT);
     if (num_pub_shards < PS_NUM_PUB_SHARDS_MIN) {
       num_pub_shards = PS_NUM_PUB_SHARDS_MIN;
@@ -128,9 +152,10 @@ struct PSConfig {
       topics[nc.topic] = topic_config;
     }
     for (auto& c : config["subscriptions"].array()) {
-      PSSubConfig sc;
-      sc.init(cct, c);
-      subscriptions.push_back(sc);
+      auto sc = std::make_shared<PSSubConfig>();
+      sc->init(cct, c, data_bucket_prefix);
+      subs[sc->name] = sc;
+      topics[sc->topic].subs.insert(sc->name);
     }
 
     ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
@@ -169,26 +194,242 @@ struct PSConfig {
       notifs->push_back(target);
     } while (iter != notifications.begin());
   }
+
+  bool find_sub(const string& name, PSSubConfigRef *ref) {
+    auto iter = subs.find(name);
+    if (iter != subs.end()) {
+      *ref = iter->second;
+      return true;
+    }
+    return false;
+  }
 };
 
 using PSConfigRef = std::shared_ptr<PSConfig>;
+using EventRef = std::shared_ptr<rgw_pubsub_event>;
 
-class RGWPSInitConfigCBCR : public RGWCoroutine {
-  RGWDataSyncEnv *sync_env;
+class PSManager;
+using PSManagerRef = std::shared_ptr<PSManager>;
+
+struct PSEnv {
   PSConfigRef conf;
+  shared_ptr<RGWUserInfo> data_user_info;
+  PSManagerRef manager;
+
+  PSEnv() : conf(make_shared<PSConfig>()),
+            data_user_info(make_shared<RGWUserInfo>()) {}
+
+  void init(CephContext *cct, const JSONFormattable& config) {
+    conf->init(cct, config);
+  }
+
+  void init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
+};
+
+using PSEnvRef = std::shared_ptr<PSEnv>;
+
+class PSSubscription : public RefCountedObject {
+  RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
+  PSSubConfigRef sub_conf;
+  shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
+  RGWBucketInfo *bucket_info{nullptr};
+
+public:
+
+  class InitCR : public RGWCoroutine {
+    RGWDataSyncEnv *sync_env;
+    PSSubscriptionRef sub;
+    rgw_get_bucket_info_params get_bucket_info;
+    rgw_bucket_create_local_params create_bucket;
+    PSConfigRef& conf;
+    PSSubConfigRef& sub_conf;
+    int i;
+  public:
+    InitCR(RGWDataSyncEnv *_sync_env,
+           PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct),
+                                    sync_env(_sync_env),
+                                    sub(_sub), conf(sub->env->conf),
+                                    sub_conf(sub->sub_conf) {
+    }
+
+    int operate() override {
+      reenter(this) {
+        get_bucket_info.tenant = conf->user.tenant;
+        get_bucket_info.bucket_name = sub_conf->data_bucket_name;
+        sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
+
+        for (i = 0; i < 2; ++i) {
+          yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
+                                            sync_env->store,
+                                            get_bucket_info,
+                                            sub->get_bucket_info_result));
+          if (retcode < 0 && retcode != -ENOENT) {
+            ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant="
+              << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+          }
+          if (retcode == 0) {
+            sub->bucket_info = &sub->get_bucket_info_result->bucket_info;
+            return set_cr_done();
+          }
+
+          create_bucket.user_info = sub->env->data_user_info;
+          create_bucket.bucket_name = sub_conf->data_bucket_name;
+          ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
+          yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
+                                                sync_env->store,
+                                                create_bucket));
+          if (retcode < 0) {
+            ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant="
+              << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+            return set_cr_error(retcode);
+          }
+ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
+
+          /* second iteration: we got -ENOENT and created a bucket */
+        }
+
+        /* failed twice on -ENOENT, unexpected */
+        ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
+          << " name=" << get_bucket_info.bucket_name << dendl;
+        return set_cr_error(-EIO);
+      }
+      return 0;
+    }
+  };
+
+
+public:
+  PSSubscription(RGWDataSyncEnv *_sync_env,
+                 PSEnvRef _env,
+                 PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
+                                      env(_env),
+                                      sub_conf(_sub_conf) {}
+
+  RGWCoroutine *init_cr() {
+    return new InitCR(sync_env, this);
+  }
+
+  RGWCoroutine *store_event_cr(EventRef event);
+
+  friend class InitCR;
+};
+
+using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
+
+
+RGWCoroutine *PSSubscription::store_event_cr(EventRef event)
+{
+  /* FIXME */
+}
+
+
+class PSManager : public RefCountedObject
+{
+  RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
+
+  map<string, PSSubscriptionRef> subs;
+
+  class GetSubCR : public RGWCoroutine {
+    RGWDataSyncEnv *sync_env;
+    PSManagerRef mgr;
+    const string& sub_name;
+    PSSubscriptionRef *ref;
+
+    PSConfigRef& conf;
+
+    PSSubConfigRef sub_conf;
+  public:
+    GetSubCR(RGWDataSyncEnv *_sync_env,
+                      PSManagerRef& _mgr,
+                      const string& _sub_name,
+                      PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct),
+                                                 sync_env(_sync_env),
+                                                 mgr(_mgr),
+                                                 sub_name(_sub_name),
+                                                 ref(_ref),
+                                                 conf(mgr->env->conf) {
+    }
+
+    int operate() override {
+      reenter(this) {
+        if (!conf->find_sub(sub_name, &sub_conf)) {
+          ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+          return set_cr_error(-ENOENT);
+        }
+
+        *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+
+        yield call((*ref)->init_cr());
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
+          ref->reset();
+          return set_cr_error(retcode);
+        }
+
+        mgr->subs[sub_name] = *ref;
+        return set_cr_done();
+      }
+      return 0;
+    }
+  };
+
+  bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
+    auto iter = subs.find(sub_name);
+    if (iter != subs.end()) {
+      *sub = iter->second;
+      return true;
+    }
+    return false;
+  }
+
+  PSManager(RGWDataSyncEnv *_sync_env,
+            PSEnvRef _env) : sync_env(_sync_env),
+                             env(_env) {}
+
+public:
+  static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env,
+                                 PSEnvRef _env) {
+    auto mgr = new PSManager(_sync_env, _env);
+    mgr->self = std::shared_ptr<PSManager>(mgr);
+    return mgr->self;
+  }
+
+  RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) {
+    if (find_sub_instance(sub_name, ref)) {
+      /* found it! nothing to execute */
+      return nullptr;
+    }
+    return new GetSubCR(sync_env, self, sub_name, ref);
+  }
+
+  friend class GetSubCR;
+};
+
+void PSEnv::init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
+  manager = mgr;
+  conf->init_instance(realm, instance_id);
+}
+
+class RGWPSInitEnvCBCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
+  PSConfigRef& conf;
 
   rgw_user_create_params create_user;
+  rgw_get_user_info_params get_user_info;
 public:
-  RGWPSInitConfigCBCR(RGWDataSyncEnv *_sync_env,
-                          PSConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+  RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
+                       PSEnvRef _env) : RGWCoroutine(_sync_env->cct),
                                                     sync_env(_sync_env),
-                                                    conf(_conf) {}
+                                                    env(_env), conf(env->conf) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
 
       /* nothing to do here right now */
-      create_user.uid = conf->ps_uid;
+      create_user.user = conf->user;
       create_user.max_buckets = 0; /* unlimited */
       create_user.display_name = "pubsub";
       create_user.generate_key = false;
@@ -198,6 +439,13 @@ public:
         return set_cr_error(retcode);
       }
 
+      get_user_info.user = conf->user;
+      yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
+      if (retcode < 0) {
+        ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+        return set_cr_error(retcode);
+      }
+
       return set_cr_done();
     }
     return 0;
@@ -205,17 +453,17 @@ public:
 };
 
 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
-  PSConfigRef conf;
+  PSEnvRef env;
   uint64_t versioned_epoch;
   vector<PSNotificationConfig *> notifs;
   vector<PSNotificationConfig *>::iterator niter;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          PSConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env),
                                                                                versioned_epoch(_versioned_epoch) {
 #warning this will need to change obviously
-    conf->get_notifs(_bucket_info, _key, &notifs);
+    env->conf->get_notifs(_bucket_info, _key, &notifs);
   }
   int operate() override {
     reenter(this) {
@@ -250,13 +498,13 @@ public:
 };
 
 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
-  PSConfigRef conf;
+  PSEnvRef env;
   uint64_t versioned_epoch;
 public:
   RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
                         RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                        PSConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
-                                                           conf(_conf), versioned_epoch(_versioned_epoch) {
+                        PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                                                           env(_env), versioned_epoch(_versioned_epoch) {
   }
 
   ~RGWPSHandleRemoteObjCR() override {}
@@ -267,7 +515,7 @@ public:
      * actually be handled. Otherwise we fetch info from remote zone about every object, even
      * if we don't intend to handle it.
      */
-    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch);
   }
 };
 
@@ -276,13 +524,13 @@ class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine {
   RGWBucketInfo bucket_info;
   rgw_obj_key key;
   ceph::real_time mtime;
-  PSConfigRef conf;
+  PSEnvRef env;
 public:
   RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
-                          PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+                          PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                         bucket_info(_bucket_info), key(_key),
-                                                        mtime(_mtime), conf(_conf) {}
+                                                        mtime(_mtime), env(_env) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
@@ -307,20 +555,22 @@ public:
 };
 
 class RGWPSDataSyncModule : public RGWDataSyncModule {
-  PSConfigRef conf;
+  PSEnvRef env;
+  PSConfigRef& conf;
 public:
-  RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<PSConfig>()) {
-    conf->init(cct, config);
+  RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
+    env->init(cct, config);
   }
   ~RGWPSDataSyncModule() override {}
 
   void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
-    conf->init_instance(sync_env->store->get_realm(), instance_id);
+    PSManagerRef mgr = PSManager::get_shared(sync_env, env);
+    env->init_instance(sync_env->store->get_realm(), instance_id, mgr);
   }
 
   RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
     ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
-    return new RGWPSInitConfigCBCR(sync_env, conf);
+    return new RGWPSInitEnvCBCR(sync_env, env);
   }
   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
@@ -331,7 +581,7 @@ public:
       return nullptr;
     }
 #endif
-    return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch);
+    return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
   }
   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     /* versioned and versioned epoch params are useless in the elasticsearch backend case */
@@ -343,7 +593,7 @@ public:
       return nullptr;
     }
 #endif
-    return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+    return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env);
   }
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {