using PSEnvRef = std::shared_ptr<PSEnv>;
+class PSEvent {
+ EventRef event;
+
+public:
+ PSEvent(EventRef& _event) : event(_event) {}
+
+ string generate_message_id() {
+ char buf[64];
+ utime_t ts(event->timestamp);
+
+ string etag;
+ RGWMD5Etag hash;
+ hash.update(event->bucket.bucket_id);
+ hash.update(event->key.name);
+ hash.update(event->key.instance);
+ hash.finish(&etag);
+
+ assert(etag.size() > 8);
+
+ etag = etag.substr(0, 8);
+ snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
+
+ return buf;
+ }
+
+ void format(bufferlist *bl) {
+ stringstream ss;
+ JSONFormatter f;
+
+ encode_json("event", *event, &f);
+ f.flush(ss);
+
+ bl->append(ss.str());
+ }
+
+};
+
class PSSubscription : public RefCountedObject {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
RGWBucketInfo *bucket_info{nullptr};
RGWDataAccessRef data_access;
+ RGWDataAccess::BucketRef bucket;
public:
<< get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
}
if (retcode == 0) {
- sub->bucket_info = &sub->get_bucket_info_result->bucket_info;
+ auto& result = sub->get_bucket_info_result;
+ sub->bucket_info = &result->bucket_info;
+
+ int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+ return set_cr_error(ret);
+ }
return set_cr_done();
}
}
};
+ class StoreEventCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSSubscriptionRef sub;
+ PSEvent pse;
+ PSConfigRef& conf;
+ PSSubConfigRef& sub_conf;
+ rgw_object_simple_put_params put_obj;
+ int i;
+ public:
+ StoreEventCR(RGWDataSyncEnv *_sync_env,
+ PSSubscriptionRef& _sub,
+ EventRef& _event) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ sub(_sub),
+ pse(_event),
+ conf(sub->env->conf),
+ sub_conf(sub->sub_conf) {
+ }
+
+ int operate() override {
+ reenter(this) {
+
+ put_obj.bucket = sub->bucket;
+ put_obj.key = rgw_obj_key(pse.generate_message_id());
+
+ pse.format(&put_obj.data);
+
+ yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
+ sync_env->store,
+ put_obj));
+ }
+ return 0;
+ }
+ };
public:
PSSubscription(RGWDataSyncEnv *_sync_env,
return new InitCR(sync_env, this);
}
- RGWCoroutine *store_event_cr(EventRef event);
+ RGWCoroutine *store_event_cr(EventRef& event) {
+ return new StoreEventCR(sync_env, this, event);
+ }
friend class InitCR;
};
using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
-RGWCoroutine *PSSubscription::store_event_cr(EventRef event)
-{
- /* FIXME */
-}
-
-
class PSManager : public RefCountedObject
{
RGWDataSyncEnv *sync_env;