sync_instance = instance_id;
}
- void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
- string path = bucket_info.bucket.name + "/" + key.name;
+ void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
+ string path = bucket.name + "/" + key.name;
result->clear();
continue;
}
- ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
+ ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
result->push_back(&topic->second);
} while (iter != notifications.begin());
}
etag = etag.substr(0, 8);
snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
+ event->id = buf;
+
return buf;
}
PSConfigRef& conf;
PSSubConfigRef& sub_conf;
rgw_object_simple_put_params put_obj;
+ string oid_prefix;
int i;
public:
StoreEventCR(RGWDataSyncEnv *_sync_env,
pse(_event),
conf(sub->env->conf),
sub_conf(sub->sub_conf) {
+ oid_prefix = sub->sub_conf->data_oid_prefix;
}
int operate() override {
reenter(this) {
put_obj.bucket = sub->bucket;
- put_obj.key = rgw_obj_key(pse.generate_message_id());
+ put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
pse.format(&put_obj.data);
}
};
-
-class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+class RGWPSHandleObjEvent : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
- uint64_t versioned_epoch;
+ EventRef event;
+
vector<PSTopicConfig *> topics;
vector<PSTopicConfig *>::iterator titer;
set<string>::iterator siter;
PSSubscriptionRef sub;
- EventRef event;
public:
- RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
- sync_env(_sync_env),
- env(_env),
- versioned_epoch(_versioned_epoch) {
-#warning this will need to change obviously
- env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics);
+ RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ EventRef& _event) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ event(_event) {
+#warning this will need to change
+ env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics);
}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
- make_event_ref(&event);
- event->bucket = bucket_info.bucket;
- event->key = key;
- event->event = OBJECT_CREATE;
- event->timestamp = real_clock::now();
+ ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone
+ << " event=" << json_str("event", *event, false) << dendl;
ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
-#warning more event init
for (titer = topics.begin(); titer != topics.end(); ++titer) {
- ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
+ ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
continue;
}
-#warning publish notification
+#warning push notification
}
if (retcode < 0) {
return set_cr_error(retcode);
}
};
+
+class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ uint64_t versioned_epoch;
+ EventRef event;
+public:
+ RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ sync_env(_sync_env),
+ env(_env),
+ versioned_epoch(_versioned_epoch) {
+ }
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " attrs=" << attrs << dendl;
+ make_event_ref(&event);
+ event->bucket = bucket_info.bucket;
+ event->key = key;
+ event->mtime = mtime;
+ event->event = OBJECT_CREATE;
+ event->timestamp = real_clock::now();
+
+ yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
PSEnvRef env;
uint64_t versioned_epoch;
}
};
-class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine {
+class RGWPSGenericObjEventCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
- PSEnvRef env;
+ RGWPubSubEventType event_type;
+ EventRef event;
public:
- RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key),
- mtime(_mtime), env(_env) {}
+ RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ bucket_info(_bucket_info), key(_key),
+ mtime(_mtime), event_type(_event_type) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield {
-#if 0
- string path = conf->get_obj_path(bucket_info, key);
+ make_event_ref(&event);
+ event->event = event_type;
+ event->bucket = bucket_info.bucket;
+ event->key = key;
+ event->mtime = mtime;
+ event->timestamp = real_clock::now();
- call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
- sync_env->http_manager,
- path, nullptr /* params */));
-#endif
- }
+ yield call(new RGWPSHandleObjEvent(sync_env, env, event));
if (retcode < 0) {
return set_cr_error(retcode);
}
return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- /* versioned and versioned epoch params are useless in the elasticsearch backend case */
ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
#warning this should be done correctly
#if 0
return nullptr;
}
#endif
- return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env);
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
#endif
#warning delete markers need to be handled too
- return NULL;
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
}
};