string data_bucket_name;
string data_oid_prefix;
+ void from_user_conf(const rgw_pubsub_user_sub_config& uc) {
+ name = uc.name;
+ topic = uc.topic;
+ push_endpoint = uc.dest.push_endpoint;
+ data_bucket_name = uc.dest.bucket_name;
+ data_oid_prefix = uc.dest.oid_prefix;
+ }
+
void dump(Formatter *f) const {
encode_json("name", name, f);
encode_json("topic", topic, f);
};
public:
-
PSSubscription(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
env(_env),
sub_conf(_sub_conf),
data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+
+ PSSubscription(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env),
+ env(_env),
+ data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
+ sub_conf->from_user_conf(user_sub_conf);
+ }
virtual ~PSSubscription() {
if (init_cr) {
init_cr->put();
}
}
-public:
+
static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
PSSubConfigRef& _sub_conf) {
sub->init_cr->get();
return sub->self;
}
+ static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ rgw_pubsub_user_sub_config& _sub_conf) {
+ auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
+ sub->self = std::shared_ptr<PSSubscription>(sub);
+ sub->init_cr = new InitCR(_sync_env, sub->self);
+ sub->init_cr->get();
+ return sub->self;
+ }
int call_init_cr(RGWCoroutine *caller) {
return init_cr->execute(caller);
class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
RGWDataSyncEnv *sync_env;
PSManagerRef mgr;
- const string& sub_name;
+ rgw_user owner;
+ string sub_name;
+ string sub_id;
PSSubscriptionRef *ref;
- PSConfigRef& conf;
+ PSConfigRef conf;
PSSubConfigRef sub_conf;
+ rgw_pubsub_user_sub_config user_sub_conf;
public:
GetSubCR(RGWDataSyncEnv *_sync_env,
PSManagerRef& _mgr,
+ const rgw_user& _owner,
const string& _sub_name,
PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
sync_env(_sync_env),
mgr(_mgr),
+ owner(_owner),
sub_name(_sub_name),
ref(_ref),
conf(mgr->env->conf) {
int operate() override {
reenter(this) {
- if (!conf->find_sub(sub_name, &sub_conf)) {
- ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
- return set_cr_error(-ENOENT);
- }
+ if (owner.empty()) {
+ if (!conf->find_sub(sub_name, &sub_conf)) {
+ ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+ return set_cr_error(-ENOENT);
+ }
- *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+ *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+ } else {
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_sub_config>;
+ yield {
+ RGWUserPubSub ups(sync_env->store, owner);
+ rgw_raw_obj obj;
+ ups.get_sub_meta_obj(sub_name, &obj);
+ bool empty_on_enoent = false;
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
+ obj,
+ &user_sub_conf, empty_on_enoent));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf);
+ }
yield (*ref)->call_init_cr(this);
if (retcode < 0) {
return set_cr_error(retcode);
}
- mgr->subs[sub_name] = *ref;
+ if (owner.empty()) {
+ mgr->subs[sub_name] = *ref;
+ }
return set_cr_done();
}
return mgr->self;
}
- int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) {
- if (find_sub_instance(sub_name, ref)) {
+ int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
+ string owner_prefix;
+ if (!owner.empty()) {
+ owner_prefix = owner.to_str() + "/";
+ }
+
+ string sub_instance = owner_prefix + sub_name;
+ if (find_sub_instance(sub_instance, ref)) {
/* found it! nothing to execute */
ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
- return 0;
}
- auto& gs = get_subs[sub_name];
+ auto& gs = get_subs[sub_instance];
if (!gs) {
ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
- gs = new GetSubCR(sync_env, self, sub_name, ref);
+ gs = new GetSubCR(sync_env, self, owner, sub_name, ref);
}
ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
return gs->execute(caller, ref);
public:
RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
PSEnvRef& _env,
- const RGWBucketInfo& _bucket_info,
+ const rgw_user& _owner,
+ const rgw_bucket& _bucket,
const rgw_obj_key& _key,
TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
- owner(_bucket_info.owner),
- bucket(_bucket_info.bucket),
+ owner(_owner),
+ bucket(_bucket),
key(_key),
ups(_sync_env->store, owner),
topics(_topics) {
class RGWPSHandleObjEvent : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
+ std::array<rgw_user, 2> owners;
+ rgw_user& owner;
+ rgw_user& no_owner;
+ std::array<rgw_user, 2>::iterator oiter;
EventRef event;
vector<PSTopicConfigRef>::iterator titer;
public:
RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
+ const rgw_user& _owner,
EventRef& _event,
TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
+ owner(owners[0]),
+ no_owner(owners[1]),
event(_event),
topics(_topics) {
+ owner = _owner;
}
int operate() override {
reenter(this) {
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
- yield env->manager->call_get_subscription_cr(this, *siter, &sub);
- if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
- continue;
- }
+ for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
+ /*
+ * once for the global subscriptions, once for the user specific subscriptions
+ */
+ yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub);
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
+ continue;
+ }
- yield call(sub->store_event_cr(event));
- if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
- continue;
+ yield call(sub->store_event_cr(event));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ continue;
+ }
}
#warning push notification
}
}
- yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
+ yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
int operate() override {
reenter(this) {
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
class RGWPSGenericObjEventCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
- RGWBucketInfo bucket_info;
+ rgw_user owner;
+ rgw_bucket bucket;
rgw_obj_key key;
ceph::real_time mtime;
RGWPubSubEventType event_type;
RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
- bucket_info(_bucket_info), key(_key),
+ owner(_bucket_info.owner),
+ bucket(_bucket_info.bucket),
+ key(_key),
mtime(_mtime), event_type(_event_type) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
+ << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
}
if (topics->empty()) {
- ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+ ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
return set_cr_done();
}
make_event_ref(&event);
event->event = event_type;
- event->bucket = bucket_info.bucket;
+ event->bucket = bucket;
event->key = key;
event->mtime = mtime;
event->timestamp = real_clock::now();
- yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
+ yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}