{
"tenant": <tenant>, # default: <empty>
"uid": <uid>, # default: "pubsub"
- "data_bucket_prefix": <prefix> # default: "pubsub"
+ "data_bucket_prefix": <prefix> # default: "pubsub-"
"data_oid_prefix": <prefix> #
# non-dynamic config
void init(CephContext *cct, const JSONFormattable& config) {
string uid = config["uid"]("pubsub");
user = rgw_user(config["tenant"], uid);
- data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+ data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
data_oid_prefix = config["data_oid_prefix"];
- /* FIXME: this will be dynamically configured */
for (auto& c : config["notifications"].array()) {
PSNotificationConfig nc;
nc.id = ++max_id;
using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
class PSSubscription {
- PSSubscriptionRef self;
RGWDataSyncEnv *sync_env;
PSEnvRef env;
PSSubConfigRef sub_conf;
}
}
- static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+ template <class C>
+ static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
- PSSubConfigRef& _sub_conf) {
- auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
- sub->self = std::shared_ptr<PSSubscription>(sub);
- sub->init_cr = new InitCR(_sync_env, sub->self);
+ C& _sub_conf) {
+ auto sub = std::make_shared<PSSubscription>(_sync_env, _env, _sub_conf);
+ sub->init_cr = new InitCR(_sync_env, sub);
sub->init_cr->get();
- return sub->self;
- }
- static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
- PSEnvRef _env,
- rgw_pubsub_user_sub_config& _sub_conf) {
- auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
- sub->self = std::shared_ptr<PSSubscription>(sub);
- sub->init_cr = new InitCR(_sync_env, sub->self);
- sub->init_cr->get();
- return sub->self;
+ return sub;
}
int call_init_cr(RGWCoroutine *caller) {
return init_cr->execute(caller);
}
- RGWCoroutine *store_event_cr(EventRef& event) {
- return new StoreEventCR(sync_env, self, event);
+ static RGWCoroutine *store_event_cr(RGWDataSyncEnv *sync_env, PSSubscriptionRef& sub, EventRef& event) {
+ return new StoreEventCR(sync_env, sub, event);
}
friend class InitCR;
class PSManager
{
- PSManagerRef self;
-
RGWDataSyncEnv *sync_env;
PSEnvRef env;
conf(mgr->env->conf) {
}
~GetSubCR() {
- mgr->remove_get_sub(sub_name);
}
int operate() override {
if (owner.empty()) {
if (!conf->find_sub(sub_name, &sub_conf)) {
ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+ mgr->remove_get_sub(owner, sub_name);
return set_cr_error(-ENOENT);
}
&user_sub_conf, empty_on_enoent));
}
if (retcode < 0) {
+ mgr->remove_get_sub(owner, sub_name);
return set_cr_error(retcode);
}
yield (*ref)->call_init_cr(this);
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
+ mgr->remove_get_sub(owner, sub_name);
return set_cr_error(retcode);
}
if (owner.empty()) {
mgr->subs[sub_name] = *ref;
}
+ mgr->remove_get_sub(owner, sub_name);
return set_cr_done();
}
}
};
+ string sub_id(const rgw_user& owner, const string& sub_name) {
+ string owner_prefix;
+ if (!owner.empty()) {
+ owner_prefix = owner.to_str() + "/";
+ }
+
+ return owner_prefix + sub_name;
+ }
+
map<string, GetSubCR *> get_subs;
- void remove_get_sub(const string& name) {
- get_subs.erase(name);
+ GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
+ return get_subs[sub_id(owner, name)];
}
- bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
- auto iter = subs.find(sub_name);
+ void remove_get_sub(const rgw_user& owner, const string& name) {
+ get_subs.erase(sub_id(owner, name));
+ }
+
+ bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
+ auto iter = subs.find(sub_id(owner, sub_name));
if (iter != subs.end()) {
*sub = iter->second;
return true;
env(_env) {}
public:
- static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env,
+ static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env,
PSEnvRef _env) {
- auto mgr = new PSManager(_sync_env, _env);
- mgr->self = std::shared_ptr<PSManager>(mgr);
- return mgr->self;
+ return std::shared_ptr<PSManager>(new PSManager(_sync_env, _env));
}
- int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
- string owner_prefix;
- if (!owner.empty()) {
- owner_prefix = owner.to_str() + "/";
- }
-
- string sub_instance = owner_prefix + sub_name;
- if (find_sub_instance(sub_instance, ref)) {
+ static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
+ if (mgr->find_sub_instance(owner, sub_name, ref)) {
/* found it! nothing to execute */
ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
}
- auto& gs = get_subs[sub_instance];
+ auto& gs = mgr->get_get_subs(owner, sub_name);
if (!gs) {
ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
- gs = new GetSubCR(sync_env, self, owner, sub_name, ref);
+ gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref);
}
ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
return gs->execute(caller, ref);
/*
* once for the global subscriptions, once for the user specific subscriptions
*/
- yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub);
+ yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
if (retcode < 0) {
ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
continue;
}
- yield call(sub->store_event_cr(event));
+ yield call(PSSubscription::store_event_cr(sync_env, sub, event));
if (retcode < 0) {
ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
continue;
env->init_instance(sync_env->store->get_realm(), instance_id, mgr);
}
- RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
- ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
+ RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
+ ldout(sync_env->cct, 5) << conf->id << ": start" << dendl;
return new RGWPSInitEnvCBCR(sync_env, env);
}
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {