From 23ef624568ca6371daaac1d35b46b4bcb0536bce Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Sun, 1 Jul 2018 18:07:58 -0700 Subject: [PATCH] rgw: pubsub: more events coverage Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_pubsub.cc | 125 +++++++++++++++++++----------- 1 file changed, 80 insertions(+), 45 deletions(-) diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 23e5d14c8e8..fbd3eb38906 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -236,8 +236,8 @@ struct PSConfig { sync_instance = instance_id; } - void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector *result) { - string path = bucket_info.bucket.name + "/" + key.name; + void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector *result) { + string path = bucket.name + "/" + key.name; result->clear(); @@ -267,7 +267,7 @@ struct PSConfig { continue; } - ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; + ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; result->push_back(&topic->second); } while (iter != notifications.begin()); } @@ -331,6 +331,8 @@ public: etag = etag.substr(0, 8); snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str()); + event->id = buf; + return buf; } @@ -522,6 +524,7 @@ class PSSubscription { PSConfigRef& conf; PSSubConfigRef& sub_conf; rgw_object_simple_put_params put_obj; + string oid_prefix; int i; public: StoreEventCR(RGWDataSyncEnv *_sync_env, @@ -532,13 +535,14 @@ class PSSubscription { pse(_event), conf(sub->env->conf), sub_conf(sub->sub_conf) { + oid_prefix = sub->sub_conf->data_oid_prefix; } int operate() override { reenter(this) { put_obj.bucket = sub->bucket; - put_obj.key = rgw_obj_key(pse.generate_message_id()); + put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id()); pse.format(&put_obj.data); @@ -748,42 +752,34 @@ public: } }; - -class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { +class RGWPSHandleObjEvent : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; - uint64_t versioned_epoch; + EventRef event; + vector topics; vector::iterator titer; set::iterator siter; PSSubscriptionRef sub; - EventRef event; public: - RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), - sync_env(_sync_env), - env(_env), - versioned_epoch(_versioned_epoch) { -#warning this will need to change obviously - env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics); + RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + EventRef& _event) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + env(_env), + event(_event) { +#warning this will need to change + env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics); } int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime - << " attrs=" << attrs << dendl; - make_event_ref(&event); - event->bucket = bucket_info.bucket; - event->key = key; - event->event = OBJECT_CREATE; - event->timestamp = real_clock::now(); + ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone + << " event=" << json_str("event", *event, false) << dendl; ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl; -#warning more event init for (titer = topics.begin(); titer != topics.end(); ++titer) { - ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; + ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; @@ -800,7 +796,7 @@ public: continue; } -#warning publish notification +#warning push notification } if (retcode < 0) { return set_cr_error(retcode); @@ -812,6 +808,42 @@ public: } }; + +class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { + RGWDataSyncEnv *sync_env; + PSEnvRef env; + uint64_t versioned_epoch; + EventRef event; +public: + RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + sync_env(_sync_env), + env(_env), + versioned_epoch(_versioned_epoch) { + } + int operate() override { + reenter(this) { + ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " attrs=" << attrs << dendl; + make_event_ref(&event); + event->bucket = bucket_info.bucket; + event->key = key; + event->mtime = mtime; + event->event = OBJECT_CREATE; + event->timestamp = real_clock::now(); + + yield call(new RGWPSHandleObjEvent(sync_env, env, event)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } +}; + class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { PSEnvRef env; uint64_t versioned_epoch; @@ -834,31 +866,35 @@ public: } }; -class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine { +class RGWPSGenericObjEventCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; + PSEnvRef env; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - PSEnvRef env; + RGWPubSubEventType event_type; + EventRef event; public: - RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), key(_key), - mtime(_mtime), env(_env) {} + RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, + RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + env(_env), + bucket_info(_bucket_info), key(_key), + mtime(_mtime), event_type(_event_type) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; - yield { -#if 0 - string path = conf->get_obj_path(bucket_info, key); + make_event_ref(&event); + event->event = event_type; + event->bucket = bucket_info.bucket; + event->key = key; + event->mtime = mtime; + event->timestamp = real_clock::now(); - call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), - sync_env->http_manager, - path, nullptr /* params */)); -#endif - } + yield call(new RGWPSHandleObjEvent(sync_env, env, event)); if (retcode < 0) { return set_cr_error(retcode); } @@ -899,7 +935,6 @@ public: return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - /* versioned and versioned epoch params are useless in the elasticsearch backend case */ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; #warning this should be done correctly #if 0 @@ -908,7 +943,7 @@ public: return nullptr; } #endif - return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env); + return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { @@ -919,7 +954,7 @@ public: ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; #endif #warning delete markers need to be handled too - return NULL; + return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE); } }; -- 2.39.5