string topic;
string push_endpoint;
- void init(CephContext *cct, const JSONFormattable& config) {
+ string data_bucket_name;
+ string data_prefix;
+
+ void dump(Formatter *f) const {
+ encode_json("name", name, f);
+ encode_json("topic", topic, f);
+ encode_json("push_endpoint", push_endpoint, f);
+ encode_json("data_bucket_name", data_bucket_name, f);
+ encode_json("data_oid_prefix", data_oid_prefix, f);
+ }
+
+ void init(CephContext *cct, const JSONFormattable& config,
+ const string& data_bucket_prefix) {
name = config["name"];
topic = config["topic"];
push_endpoint = config["push_endpoint"];
+ string default_bucket_name = data_prefix + name;
+ data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
+ data_prefix = config["data_prefix"];
}
};
+using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
+
struct PSTopicConfig {
string name;
+ set<string> subs;
};
struct PSNotificationConfig {
struct PSConfig {
string id{"pubsub"};
- string ps_uid{"pubsub"};
+ rgw_user user;
+ string data_bucket_prefix;
+
uint64_t sync_instance{0};
uint32_t num_pub_shards{0};
uint32_t num_topic_shards{0};
uint64_t max_id{0};
+
/* FIXME: no hard coded buckets, we'll have configurable topics */
- vector<PSSubConfig> subscriptions;
+ map<string, PSSubConfigRef> subs;
map<string, PSTopicConfig> topics;
multimap<string, PSNotificationConfig> notifications;
void init(CephContext *cct, const JSONFormattable& config) {
- ps_uid = config["pubsub"];
+ string uid = config["uid"]("pubsub");
+ user = rgw_user(config["tenant"], uid);
+ data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+
num_pub_shards = config["num_pub_shards"](PS_NUM_PUB_SHARDS_DEFAULT);
if (num_pub_shards < PS_NUM_PUB_SHARDS_MIN) {
num_pub_shards = PS_NUM_PUB_SHARDS_MIN;
topics[nc.topic] = topic_config;
}
for (auto& c : config["subscriptions"].array()) {
- PSSubConfig sc;
- sc.init(cct, c);
- subscriptions.push_back(sc);
+ auto sc = std::make_shared<PSSubConfig>();
+ sc->init(cct, c, data_bucket_prefix);
+ subs[sc->name] = sc;
+ topics[sc->topic].subs.insert(sc->name);
}
ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
notifs->push_back(target);
} while (iter != notifications.begin());
}
+
+ bool find_sub(const string& name, PSSubConfigRef *ref) {
+ auto iter = subs.find(name);
+ if (iter != subs.end()) {
+ *ref = iter->second;
+ return true;
+ }
+ return false;
+ }
};
using PSConfigRef = std::shared_ptr<PSConfig>;
+using EventRef = std::shared_ptr<rgw_pubsub_event>;
-class RGWPSInitConfigCBCR : public RGWCoroutine {
- RGWDataSyncEnv *sync_env;
+class PSManager;
+using PSManagerRef = std::shared_ptr<PSManager>;
+
+struct PSEnv {
PSConfigRef conf;
+ shared_ptr<RGWUserInfo> data_user_info;
+ PSManagerRef manager;
+
+ PSEnv() : conf(make_shared<PSConfig>()),
+ data_user_info(make_shared<RGWUserInfo>()) {}
+
+ void init(CephContext *cct, const JSONFormattable& config) {
+ conf->init(cct, config);
+ }
+
+ void init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
+};
+
+using PSEnvRef = std::shared_ptr<PSEnv>;
+
+class PSSubscription : public RefCountedObject {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ PSSubConfigRef sub_conf;
+ shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
+ RGWBucketInfo *bucket_info{nullptr};
+
+public:
+
+ class InitCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSSubscriptionRef sub;
+ rgw_get_bucket_info_params get_bucket_info;
+ rgw_bucket_create_local_params create_bucket;
+ PSConfigRef& conf;
+ PSSubConfigRef& sub_conf;
+ int i;
+ public:
+ InitCR(RGWDataSyncEnv *_sync_env,
+ PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ sub(_sub), conf(sub->env->conf),
+ sub_conf(sub->sub_conf) {
+ }
+
+ int operate() override {
+ reenter(this) {
+ get_bucket_info.tenant = conf->user.tenant;
+ get_bucket_info.bucket_name = sub_conf->data_bucket_name;
+ sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
+
+ for (i = 0; i < 2; ++i) {
+ yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
+ sync_env->store,
+ get_bucket_info,
+ sub->get_bucket_info_result));
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant="
+ << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+ }
+ if (retcode == 0) {
+ sub->bucket_info = &sub->get_bucket_info_result->bucket_info;
+ return set_cr_done();
+ }
+
+ create_bucket.user_info = sub->env->data_user_info;
+ create_bucket.bucket_name = sub_conf->data_bucket_name;
+ ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
+ yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
+ sync_env->store,
+ create_bucket));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant="
+ << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
+
+ /* second iteration: we got -ENOENT and created a bucket */
+ }
+
+ /* failed twice on -ENOENT, unexpected */
+ ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
+ << " name=" << get_bucket_info.bucket_name << dendl;
+ return set_cr_error(-EIO);
+ }
+ return 0;
+ }
+ };
+
+
+public:
+ PSSubscription(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
+ env(_env),
+ sub_conf(_sub_conf) {}
+
+ RGWCoroutine *init_cr() {
+ return new InitCR(sync_env, this);
+ }
+
+ RGWCoroutine *store_event_cr(EventRef event);
+
+ friend class InitCR;
+};
+
+using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
+
+
+RGWCoroutine *PSSubscription::store_event_cr(EventRef event)
+{
+ /* FIXME */
+}
+
+
+class PSManager : public RefCountedObject
+{
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+
+ map<string, PSSubscriptionRef> subs;
+
+ class GetSubCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSManagerRef mgr;
+ const string& sub_name;
+ PSSubscriptionRef *ref;
+
+ PSConfigRef& conf;
+
+ PSSubConfigRef sub_conf;
+ public:
+ GetSubCR(RGWDataSyncEnv *_sync_env,
+ PSManagerRef& _mgr,
+ const string& _sub_name,
+ PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ mgr(_mgr),
+ sub_name(_sub_name),
+ ref(_ref),
+ conf(mgr->env->conf) {
+ }
+
+ int operate() override {
+ reenter(this) {
+ if (!conf->find_sub(sub_name, &sub_conf)) {
+ ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+ return set_cr_error(-ENOENT);
+ }
+
+ *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+
+ yield call((*ref)->init_cr());
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
+ ref->reset();
+ return set_cr_error(retcode);
+ }
+
+ mgr->subs[sub_name] = *ref;
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
+
+ bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
+ auto iter = subs.find(sub_name);
+ if (iter != subs.end()) {
+ *sub = iter->second;
+ return true;
+ }
+ return false;
+ }
+
+ PSManager(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env) : sync_env(_sync_env),
+ env(_env) {}
+
+public:
+ static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env) {
+ auto mgr = new PSManager(_sync_env, _env);
+ mgr->self = std::shared_ptr<PSManager>(mgr);
+ return mgr->self;
+ }
+
+ RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) {
+ if (find_sub_instance(sub_name, ref)) {
+ /* found it! nothing to execute */
+ return nullptr;
+ }
+ return new GetSubCR(sync_env, self, sub_name, ref);
+ }
+
+ friend class GetSubCR;
+};
+
+void PSEnv::init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
+ manager = mgr;
+ conf->init_instance(realm, instance_id);
+}
+
+class RGWPSInitEnvCBCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ PSConfigRef& conf;
rgw_user_create_params create_user;
+ rgw_get_user_info_params get_user_info;
public:
- RGWPSInitConfigCBCR(RGWDataSyncEnv *_sync_env,
- PSConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+ RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- conf(_conf) {}
+ env(_env), conf(env->conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
/* nothing to do here right now */
- create_user.uid = conf->ps_uid;
+ create_user.user = conf->user;
create_user.max_buckets = 0; /* unlimited */
create_user.display_name = "pubsub";
create_user.generate_key = false;
return set_cr_error(retcode);
}
+ get_user_info.user = conf->user;
+ yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
return set_cr_done();
}
return 0;
};
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
- PSConfigRef conf;
+ PSEnvRef env;
uint64_t versioned_epoch;
vector<PSNotificationConfig *> notifs;
vector<PSNotificationConfig *>::iterator niter;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+ PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env),
versioned_epoch(_versioned_epoch) {
#warning this will need to change obviously
- conf->get_notifs(_bucket_info, _key, ¬ifs);
+ env->conf->get_notifs(_bucket_info, _key, ¬ifs);
}
int operate() override {
reenter(this) {
};
class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
- PSConfigRef conf;
+ PSEnvRef env;
uint64_t versioned_epoch;
public:
RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
- conf(_conf), versioned_epoch(_versioned_epoch) {
+ PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ env(_env), versioned_epoch(_versioned_epoch) {
}
~RGWPSHandleRemoteObjCR() override {}
* actually be handled. Otherwise we fetch info from remote zone about every object, even
* if we don't intend to handle it.
*/
- return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+ return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch);
}
};
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
- PSConfigRef conf;
+ PSEnvRef env;
public:
RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_info(_bucket_info), key(_key),
- mtime(_mtime), conf(_conf) {}
+ mtime(_mtime), env(_env) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
};
class RGWPSDataSyncModule : public RGWDataSyncModule {
- PSConfigRef conf;
+ PSEnvRef env;
+ PSConfigRef& conf;
public:
- RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<PSConfig>()) {
- conf->init(cct, config);
+ RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
+ env->init(cct, config);
}
~RGWPSDataSyncModule() override {}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
- conf->init_instance(sync_env->store->get_realm(), instance_id);
+ PSManagerRef mgr = PSManager::get_shared(sync_env, env);
+ env->init_instance(sync_env->store->get_realm(), instance_id, mgr);
}
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
- return new RGWPSInitConfigCBCR(sync_env, conf);
+ return new RGWPSInitEnvCBCR(sync_env, env);
}
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return nullptr;
}
#endif
- return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch);
+ return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
return nullptr;
}
#endif
- return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+ return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {