sync_instance = instance_id;
}
- void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSNotificationConfig *> *notifs) {
+ void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
string path = bucket_info.bucket.name + "/" + key.name;
- notifs->clear();
+ result->clear();
auto iter = notifications.upper_bound(path);
if (iter == notifications.begin()) {
break;
}
- PSNotificationConfig *target = &iter->second;
+ PSNotificationConfig& target = iter->second;
- if (!target->is_prefix &&
+ if (!target.is_prefix &&
path.size() != iter->first.size()) {
continue;
}
- notifs->push_back(target);
+ auto topic = topics.find(target.topic);
+ if (topic == topics.end()) {
+ continue;
+ }
+
+ ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
+ result->push_back(&topic->second);
} while (iter != notifications.begin());
}
using PSConfigRef = std::shared_ptr<PSConfig>;
using EventRef = std::shared_ptr<rgw_pubsub_event>;
+static void make_event_ref(EventRef *event) {
+ *event = std::make_shared<rgw_pubsub_event>();
+}
+
class PSManager;
using PSManagerRef = std::shared_ptr<PSManager>;
};
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+ RGWDataSyncEnv *sync_env;
PSEnvRef env;
uint64_t versioned_epoch;
- vector<PSNotificationConfig *> notifs;
- vector<PSNotificationConfig *>::iterator niter;
+ vector<PSTopicConfig *> topics;
+ vector<PSTopicConfig *>::iterator titer;
+ set<string>::iterator siter;
+ PSSubscriptionRef sub;
+ EventRef event;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env),
- versioned_epoch(_versioned_epoch) {
+ PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ sync_env(_sync_env),
+ env(_env),
+ versioned_epoch(_versioned_epoch) {
#warning this will need to change obviously
- env->conf->get_notifs(_bucket_info, _key, ¬ifs);
+ env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics);
}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
+ make_event_ref(&event);
+ event->bucket = bucket_info.bucket;
+ event->key = key;
+ event->event = OBJECT_CREATE;
+ event->timestamp = real_clock::now();
+ ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
+#warning more event init
- for (niter = notifs.begin(); niter != notifs.end(); ++niter) {
- yield {
- ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl;
+ for (titer = topics.begin(); titer != topics.end(); ++titer) {
+ ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
-#warning publish notification
-#if 0
- string path = conf->get_obj_path(bucket_info, key);
- es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+ for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
+ ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
- call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
- sync_env->http_manager,
- path, nullptr /* params */,
- doc, nullptr /* result */));
-#endif
+ yield call(env->manager->get_subscription_cr(*siter, &sub));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
+ continue;
+ }
+
+ yield call(sub->store_event_cr(event));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ continue;
+ }
+
+#warning publish notification
}
if (retcode < 0) {
return set_cr_error(retcode);