push_endpoint = config["push_endpoint"];
string default_bucket_name = data_bucket_prefix + name;
data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
-#warning use data_oid_prefix for oid generation
data_oid_prefix = config["data_oid_prefix"];
}
};
return ss.str();
}
+using TopicsRef = std::shared_ptr<vector<PSTopicConfig *>>;
+
struct PSConfig {
string id{"pubsub"};
sync_instance = instance_id;
}
- void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
+ void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
string path = bucket.name + "/" + key.name;
- result->clear();
+ (*result)->clear();
auto iter = notifications.upper_bound(path);
if (iter == notifications.begin()) {
}
ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
- result->push_back(&topic->second);
+ (*result)->push_back(&topic->second);
} while (iter != notifications.begin());
}
rgw_get_user_info_params get_user_info;
public:
RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
- PSEnvRef _env) : RGWCoroutine(_sync_env->cct),
+ PSEnvRef& _env) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env), conf(env->conf) {}
int operate() override {
}
};
+class RGWPSFindBucketTopicsCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ rgw_bucket bucket;
+ rgw_obj_key key;
+
+ TopicsRef *topics;
+public:
+ RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef& _env,
+ const rgw_bucket& _bucket,
+ const rgw_obj_key& _key,
+ TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ bucket(_bucket),
+ key(_key),
+ topics(_topics) {
+ *topics = std::make_shared<vector<PSTopicConfig *> >();
+ }
+ int operate() override {
+ reenter(this) {
+#warning this will need to change
+ env->conf->get_topics(sync_env->cct, bucket, key, topics);
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
class RGWPSHandleObjEvent : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
EventRef event;
- vector<PSTopicConfig *> topics;
vector<PSTopicConfig *>::iterator titer;
set<string>::iterator siter;
PSSubscriptionRef sub;
+ TopicsRef topics;
public:
RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
- EventRef& _event) : RGWCoroutine(_sync_env->cct),
+ EventRef& _event,
+ TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
- event(_event) {
-#warning this will need to change
- env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics);
+ event(_event),
+ topics(_topics) {
}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone
<< " event=" << json_str("event", *event, false) << dendl;
- ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
+ ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
- for (titer = topics.begin(); titer != topics.end(); ++titer) {
+ for (titer = topics->begin(); titer != topics->end(); ++titer) {
ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
PSEnvRef env;
uint64_t versioned_epoch;
EventRef event;
+ TopicsRef topics;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ PSEnvRef _env, uint64_t _versioned_epoch,
+ TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
sync_env(_sync_env),
env(_env),
- versioned_epoch(_versioned_epoch) {
+ versioned_epoch(_versioned_epoch),
+ topics(_topics) {
}
int operate() override {
reenter(this) {
event->mtime = mtime;
event->event = OBJECT_CREATE;
event->timestamp = real_clock::now();
+ {
+ for (auto& attr : attrs) {
+ string k = attr.first;
+ if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
+ k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
+ }
+ string v = attr.second.to_str();
+ auto p = std::make_pair(k, v);
+ event->attrs.push_back(p);
+ }
+ }
- yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+ yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
PSEnvRef env;
uint64_t versioned_epoch;
+ TopicsRef topics;
public:
RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
- env(_env), versioned_epoch(_versioned_epoch) {
+ PSEnvRef _env, uint64_t _versioned_epoch,
+ TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ env(_env), versioned_epoch(_versioned_epoch),
+ topics(_topics) {
}
~RGWPSHandleRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
-#warning things need to change
- /* FIXME: we need to create a pre_callback coroutine that decides whether object should
- * actually be handled. Otherwise we fetch info from remote zone about every object, even
- * if we don't intend to handle it.
- */
- return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch);
+ return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
+ }
+};
+
+class RGWPSHandleObjCreateCR : public RGWCoroutine {
+
+ RGWDataSyncEnv *sync_env;
+ RGWBucketInfo bucket_info;
+ rgw_obj_key key;
+ PSEnvRef env;
+ uint64_t versioned_epoch;
+ TopicsRef topics;
+public:
+ RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ PSEnvRef _env, uint64_t _versioned_epoch) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ bucket_info(_bucket_info),
+ key(_key),
+ env(_env),
+ versioned_epoch(_versioned_epoch) {
+ }
+
+ ~RGWPSHandleObjCreateCR() override {}
+
+ int operate() override {
+ reenter(this) {
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (topics->empty()) {
+ ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+ return set_cr_done();
+ }
+ yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
}
};
ceph::real_time mtime;
RGWPubSubEventType event_type;
EventRef event;
+ TopicsRef topics;
public:
RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (topics->empty()) {
+ ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+ return set_cr_done();
+ }
make_event_ref(&event);
event->event = event_type;
event->bucket = bucket_info.bucket;
event->mtime = mtime;
event->timestamp = real_clock::now();
- yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+ yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
}
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
-#warning this should be done correctly
-#if 0
- if (!conf->should_handle_operation(bucket_info)) {
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
- return nullptr;
- }
-#endif
- return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
+ return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-#warning this should be done correctly
-#if 0
- if (!conf->should_handle_operation(bucket_info)) {
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
- return nullptr;
- }
-#endif
return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-#warning requests should be filtered correctly
-#if 0
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
-#endif
-#warning delete markers need to be handled too
return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
}
};