From ca1d81d0acb472a736a5d9d394f4ecf4f8871111 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 6 Dec 2020 16:26:05 +0200 Subject: [PATCH] rgw/notification: use the term "event" for notifications as well as some other nameing changes for clarifications Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_notify.cc | 110 +++++++++++++++--------------- src/rgw/rgw_pubsub.cc | 4 +- src/rgw/rgw_pubsub.h | 8 +-- src/rgw/rgw_pubsub_push.cc | 32 ++++----- src/rgw/rgw_pubsub_push.h | 6 +- src/rgw/rgw_sync_module_pubsub.cc | 75 ++++++++++---------- 6 files changed, 117 insertions(+), 118 deletions(-) diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 4f41aa420c5e2..29b0c44093ad4 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -18,15 +18,15 @@ namespace rgw::notify { -struct record_with_endpoint_t { - rgw_pubsub_s3_record record; +struct event_entry_t { + rgw_pubsub_s3_event event; std::string push_endpoint; std::string push_endpoint_args; std::string arn_topic; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(record, bl); + encode(event, bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); encode(arn_topic, bl); @@ -35,14 +35,14 @@ struct record_with_endpoint_t { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(record, bl); + decode(event, bl); decode(push_endpoint, bl); decode(push_endpoint_args, bl); decode(arn_topic, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(record_with_endpoint_t) +WRITE_CLASS_ENCODER(event_entry_t) using queues_t = std::set; @@ -149,35 +149,35 @@ class Manager { // processing of a specific entry // return whether processing was successfull (true) or not (false) bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) { - record_with_endpoint_t record_with_endpoint; + event_entry_t event_entry; auto iter = entry.data.cbegin(); try { - decode(record_with_endpoint, iter); + decode(event_entry, iter); } catch (buffer::error& err) { ldout(cct, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl; return false; } try { // TODO move endpoint creation to queue level - const auto push_endpoint = RGWPubSubEndpoint::create(record_with_endpoint.push_endpoint, record_with_endpoint.arn_topic, - RGWHTTPArgs(record_with_endpoint.push_endpoint_args), + const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic, + RGWHTTPArgs(event_entry.push_endpoint_args), cct); - ldout(cct, 20) << "INFO: push endpoint created: " << record_with_endpoint.push_endpoint << + ldout(cct, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << " for entry: " << entry.marker << dendl; - const auto ret = push_endpoint->send_to_completion_async(cct, record_with_endpoint.record, optional_yield(io_context, yield)); + const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield)); if (ret < 0) { - ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint + ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint << " failed. error: " << ret << " (will retry)" << dendl; return false; } else { - ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint + ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint << " ok" << dendl; if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); return true; } } catch (const RGWPubSubEndpoint::configuration_error& e) { ldout(cct, 5) << "WARNING: failed to create push endpoint: " - << record_with_endpoint.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; + << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; return false; } } @@ -647,56 +647,56 @@ void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValue } } -// populate record from request -void populate_record_from_request(const req_state *s, +// populate event from request +void populate_event_from_request(const req_state *s, rgw::sal::RGWObject* obj, uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, - rgw_pubsub_s3_record& record) { - record.eventTime = mtime; - record.eventName = to_string(event_type); - record.userIdentity = s->user->get_id().id; // user that triggered the change - record.x_amz_request_id = s->req_id; // request ID of the original change - record.x_amz_id_2 = s->host_id; // RGW on which the change was made + rgw_pubsub_s3_event& event) { + event.eventTime = mtime; + event.eventName = to_string(event_type); + event.userIdentity = s->user->get_id().id; // user that triggered the change + event.x_amz_request_id = s->req_id; // request ID of the original change + event.x_amz_id_2 = s->host_id; // RGW on which the change was made // configurationId is filled from notification configuration - record.bucket_name = s->bucket_name; - record.bucket_ownerIdentity = s->bucket_owner.get_id().id; - record.bucket_arn = to_string(rgw::ARN(s->bucket->get_key())); - record.object_key = obj->get_name(); - record.object_size = size; - record.object_etag = etag; - record.object_versionId = obj->get_instance(); + event.bucket_name = s->bucket_name; + event.bucket_ownerIdentity = s->bucket_owner.get_id().id; + event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key())); + event.object_key = obj->get_name(); + event.object_size = size; + event.object_etag = etag; + event.object_versionId = obj->get_instance(); // use timestamp as per key sequence id (hex encoded) const utime_t ts(real_clock::now()); boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), - std::back_inserter(record.object_sequencer)); - set_event_id(record.id, etag, ts); - record.bucket_id = s->bucket->get_bucket_id(); + std::back_inserter(event.object_sequencer)); + set_event_id(event.id, etag, ts); + event.bucket_id = s->bucket->get_bucket_id(); // pass meta data if (s->info.x_meta_map.empty()) { // try to fetch the metadata from the attributes - metadata_from_attributes(s, obj, record.x_meta_map); + metadata_from_attributes(s, obj, event.x_meta_map); } else { - record.x_meta_map = s->info.x_meta_map; + event.x_meta_map = s->info.x_meta_map; } // pass tags if (s->tagset.get_tags().empty()) { // try to fetch the tags from the attributes - tags_from_attributes(s, obj, record.tags); + tags_from_attributes(s, obj, event.tags); } else { - record.tags = s->tagset.get_tags(); + event.tags = s->tagset.get_tags(); } // opaque data will be filled from topic configuration } -bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, +bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, EventType event, const RGWObjTags* req_tags) { - if (!::match(filter.events, event)) { + if (!match(filter.events, event)) { return false; } - if (!::match(filter.s3_filter.key_filter, obj->get_name())) { + if (!match(filter.s3_filter.key_filter, obj->get_name())) { return false; } @@ -704,14 +704,14 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal:: // metadata filter exists if (!s->info.x_meta_map.empty()) { // metadata was cached in req_state - if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { + if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { return false; } } else { // try to fetch the metadata from the attributes KeyValueMap metadata; metadata_from_attributes(s, obj, metadata); - if (!::match(filter.s3_filter.metadata_filter, metadata)) { + if (!match(filter.s3_filter.metadata_filter, metadata)) { return false; } } @@ -721,19 +721,19 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal:: // tag filter exists if (req_tags) { // tags in the request - if (!::match(filter.s3_filter.tag_filter, req_tags->get_tags())) { + if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) { return false; } } else if (!s->tagset.get_tags().empty()) { // tags were cached in req_state - if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) { + if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) { return false; } } else { // try to fetch tags from the attributes KeyValueMap tags; tags_from_attributes(s, obj, tags); - if (!::match(filter.s3_filter.tag_filter, tags)) { + if (!match(filter.s3_filter.tag_filter, tags)) { return false; } } @@ -757,8 +757,8 @@ int publish_reserve(EventType event_type, for (const auto& bucket_topic : bucket_topics.topics) { const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; const rgw_pubsub_topic& topic_cfg = topic_filter.topic; - if (!match(topic_filter, res.s, res.object, event_type, req_tags)) { - // topic does not apply to req_state + if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) { + // notification does not apply to req_state continue; } ldout(res.s->cct, 20) << "INFO: notification: '" << topic_filter.s3_id << @@ -808,16 +808,16 @@ int publish_commit(rgw::sal::RGWObject* obj, // nothing to commit or already committed/aborted continue; } - record_with_endpoint_t record_with_endpoint; - populate_record_from_request(res.s, obj, size, mtime, etag, event_type, record_with_endpoint.record); - record_with_endpoint.record.configurationId = topic.configurationId; - record_with_endpoint.record.opaque_data = topic.cfg.opaque_data; + event_entry_t event_entry; + populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event); + event_entry.event.configurationId = topic.configurationId; + event_entry.event.opaque_data = topic.cfg.opaque_data; if (topic.cfg.dest.persistent) { - record_with_endpoint.push_endpoint = std::move(topic.cfg.dest.push_endpoint); - record_with_endpoint.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args); - record_with_endpoint.arn_topic = std::move(topic.cfg.dest.arn_topic); + event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint); + event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args); + event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic); bufferlist bl; - encode(record_with_endpoint, bl); + encode(event_entry, bl); const auto& queue_name = topic.cfg.dest.arn_topic; if (bl.length() > res.size) { // try to make a larger reservation, fail only if this is not possible @@ -872,7 +872,7 @@ int publish_commit(rgw::sal::RGWObject* obj, RGWHTTPArgs(topic.cfg.dest.push_endpoint_args), res.s->cct); ldout(res.s->cct, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl; - const auto ret = push_endpoint->send_to_completion_async(res.s->cct, record_with_endpoint.record, res.s->yield); + const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield); if (ret < 0) { ldout(res.s->cct, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl; if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index d9c850b23985b..adb419deceb8d 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -236,7 +236,7 @@ void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); } -void rgw_pubsub_s3_record::dump(Formatter *f) const { +void rgw_pubsub_s3_event::dump(Formatter *f) const { encode_json("eventVersion", eventVersion, f); encode_json("eventSource", eventSource, f); encode_json("awsRegion", awsRegion, f); @@ -935,5 +935,5 @@ void RGWPubSub::SubWithEvents::dump(Formatter* f) const { // explicit instantiation for the only two possible types // no need to move implementation to header template class RGWPubSub::SubWithEvents; -template class RGWPubSub::SubWithEvents; +template class RGWPubSub::SubWithEvents; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 13b1daa6488e0..038caa3a835cb 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -205,7 +205,7 @@ struct rgw_pubsub_s3_notifications { ] }*/ -struct rgw_pubsub_s3_record { +struct rgw_pubsub_s3_event { constexpr static const char* const json_type_plural = "Records"; std::string eventVersion = "2.2"; // aws:s3 @@ -323,7 +323,7 @@ struct rgw_pubsub_s3_record { void dump(Formatter *f) const; }; -WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) +WRITE_CLASS_ENCODER(rgw_pubsub_s3_event) struct rgw_pubsub_event { constexpr static const char* const json_type_plural = "events"; @@ -357,7 +357,7 @@ struct rgw_pubsub_event { }; WRITE_CLASS_ENCODER(rgw_pubsub_event) -// settign a unique ID for an event/record based on object hash and timestamp +// settign a unique ID for an event based on object hash and timestamp void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); struct rgw_pubsub_sub_dest { @@ -737,7 +737,7 @@ public: if (conf.s3_id.empty()) { return std::make_shared>(this, sub); } - return std::make_shared>(this, sub); + return std::make_shared>(this, sub); } void get_meta_obj(rgw_raw_obj *obj) const; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 5e2e26bcbdac0..c6fbe325527ef 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -131,14 +131,14 @@ public: return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { - return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { + return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); } - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { + int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { bufferlist read_bl; RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); - const auto post_data = json_format_pubsub_event(record); + const auto post_data = json_format_pubsub_event(event); request.set_post_data(post_data); request.set_send_length(post_data.length()); if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); @@ -312,12 +312,12 @@ public: } } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { ceph_assert(conn); if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); + return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } } @@ -375,17 +375,17 @@ public: } }; - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { + int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { ceph_assert(conn); if (ack_level == ack_level_t::None) { - return amqp::publish(conn, topic, json_format_pubsub_event(record)); + return amqp::publish(conn, topic, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine auto w = std::unique_ptr(new Waiter); const auto rc = amqp::publish_with_confirm(conn, topic, - json_format_pubsub_event(record), + json_format_pubsub_event(event), std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); if (rc < 0) { // failed to publish, does not wait for reply @@ -582,12 +582,12 @@ public: } } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override { ceph_assert(conn); if (ack_level == ack_level_t::None) { - return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); + return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } else { - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } } @@ -645,16 +645,16 @@ public: } }; - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { + int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { ceph_assert(conn); if (ack_level == ack_level_t::None) { - return kafka::publish(conn, topic, json_format_pubsub_event(record)); + return kafka::publish(conn, topic, json_format_pubsub_event(event)); } else { // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine auto w = std::unique_ptr(new Waiter); const auto rc = kafka::publish_with_confirm(conn, topic, - json_format_pubsub_event(record), + json_format_pubsub_event(event), std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); if (rc < 0) { // failed to publish, does not wait for reply diff --git a/src/rgw/rgw_pubsub_push.h b/src/rgw/rgw_pubsub_push.h index 5a2f752c876b1..6cb3db74b332a 100644 --- a/src/rgw/rgw_pubsub_push.h +++ b/src/rgw/rgw_pubsub_push.h @@ -14,7 +14,7 @@ class RGWDataSyncEnv; class RGWCoroutine; class RGWHTTPArgs; struct rgw_pubsub_event; -struct rgw_pubsub_s3_record; +struct rgw_pubsub_s3_event; // endpoint base class all endpoint - types should derive from it class RGWPubSubEndpoint { @@ -37,11 +37,11 @@ public: // this method is used in order to send notification (S3 compliant) and wait for completion // in async manner via a coroutine when invoked in the data sync environment - virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0; + virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0; // this method is used in order to send notification (S3 compliant) and wait for completion // in async manner via a coroutine when invoked in the frontend environment - virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) = 0; + virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0; // present as string virtual std::string to_str() const { return ""; } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index c1610747c3902..68ac72156ad9a 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -264,38 +264,38 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, encode_json("info", oevent, &e->info); } -static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, +static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_user& owner, const rgw_obj_key& key, const ceph::real_time& mtime, - const std::vector > *attrs, + const std::vector>* attrs, rgw::notify::EventType event_type, - EventRef *record) { - *record = std::make_shared(); + EventRef* event) { + *event = std::make_shared(); - EventRef& r = *record; - r->eventTime = mtime; - r->eventName = rgw::notify::to_string(event_type); + EventRef& e = *event; + e->eventTime = mtime; + e->eventName = rgw::notify::to_string(event_type); // userIdentity: not supported in sync module // x_amz_request_id: not supported in sync module // x_amz_id_2: not supported in sync module // configurationId is filled from subscription configuration - r->bucket_name = bucket.name; - r->bucket_ownerIdentity = owner.to_str(); - r->bucket_arn = to_string(rgw::ARN(bucket)); - r->bucket_id = bucket.bucket_id; // rgw extension - r->object_key = key.name; + e->bucket_name = bucket.name; + e->bucket_ownerIdentity = owner.to_str(); + e->bucket_arn = to_string(rgw::ARN(bucket)); + e->bucket_id = bucket.bucket_id; // rgw extension + e->object_key = key.name; // object_size not supported in sync module objstore_event oevent(bucket, key, mtime, attrs); - r->object_etag = oevent.get_hash(); - r->object_versionId = key.instance; + e->object_etag = oevent.get_hash(); + e->object_versionId = key.instance; // use timestamp as per key sequence id (hex encoded) const utime_t ts(real_clock::now()); boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), - std::back_inserter(r->object_sequencer)); + std::back_inserter(e->object_sequencer)); - set_event_id(r->id, r->object_etag, ts); + set_event_id(e->id, e->object_etag, ts); } class PSManager; @@ -1022,11 +1022,10 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { const PSEnvRef env; const rgw_user owner; const EventRef event; - const EventRef record; + const EventRef s3_event; const TopicsRef topics; bool has_subscriptions; bool event_handled; - bool sub_conf_found; PSSubscriptionRef sub; std::vector::const_iterator titer; std::set::const_iterator siter; @@ -1036,13 +1035,13 @@ public: const PSEnvRef _env, const rgw_user& _owner, const EventRef& _event, - const EventRef& _record, + const EventRef& _s3_event, const TopicsRef& _topics) : RGWCoroutine(_sc->cct), sc(_sc), env(_env), owner(_owner), event(_event), - record(_record), + s3_event(s3_event), topics(_topics), has_subscriptions(false), event_handled(false) {} @@ -1106,23 +1105,23 @@ public: } } else { // subscription was made by S3 compatible API - ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; - record->configurationId = sub->sub_conf->s3_id; - record->opaque_data = (*titer)->opaque_data; - yield call(PSSubscription::store_event_cr(sc, sub, record)); + ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + s3_event->configurationId = sub->sub_conf->s3_id; + s3_event->opaque_data = (*titer)->opaque_data; + yield call(PSSubscription::store_event_cr(sc, sub, s3_event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); - ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { - ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sc, sub, record)); + ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sc, sub, s3_event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); - ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; @@ -1152,7 +1151,7 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { PSEnvRef env; std::optional versioned_epoch; EventRef event; - EventRef record; + EventRef s3_event; TopicsRef topics; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, @@ -1179,20 +1178,20 @@ public: } attrs.push_back(std::make_pair(k, attr.second)); } - // at this point we don't know whether we need the ceph event or S3 record + // at this point we don't know whether we need the ceph event or S3 event // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sc->cct, sync_pipe.info.source_bs.bucket, key, mtime, &attrs, rgw::notify::ObjectCreated, &event); - make_s3_record_ref(sc->cct, + make_s3_event_ref(sc->cct, sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, mtime, &attrs, - rgw::notify::ObjectCreated, &record); + rgw::notify::ObjectCreated, &s3_event); } - yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics)); + yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1278,7 +1277,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { ceph::real_time mtime; rgw::notify::EventType event_type; EventRef event; - EventRef record; + EventRef s3_event; TopicsRef topics; public: RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc, @@ -1304,18 +1303,18 @@ public: ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } - // at this point we don't know whether we need the ceph event or S3 record + // at this point we don't know whether we need the ceph event or S3 event // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sc->cct, bucket, key, mtime, nullptr, event_type, &event); - make_s3_record_ref(sc->cct, + make_s3_event_ref(sc->cct, bucket, owner, key, mtime, nullptr, - event_type, &record); - yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics)); + event_type, &s3_event); + yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics)); if (retcode < 0) { return set_cr_error(retcode); } -- 2.39.5